데이터 통합PythonLightweight transforms경량 변환 API

본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

경량 변환 API

경량 변환은 PySpark 쿼리를 실행하는 것을 지원하지 않습니다. 대신, 대체 API를 사용하여 쿼리를 작성해야 합니다.

Spark 변환은 특히 데이터가 로컬에 이미 존재할 때 많은 노드 간 계산을 확장하는 데 상당한 플랫폼 기능을 제공합니다. 그러나 많은 데이터 변환은 단일 기계로도 관리될 수 있습니다. 단일 기계로 데이터 처리가 충분한 경우에는 Spark를 사용하지 않고 단일 노드 사용 사례에 더 최적화된 컴퓨트 엔진을 사용하여 인프라 오버헤드를 줄일 수 있습니다.

이 문서는 변환 API에 대한 @lightweight 데코레이터를 설명합니다. 이 데코레이터는 Spark를 사용하지 않고 최대 약 1천만 행의 데이터셋 처리에 적합한 인프라를 요청하기 위해 @transform@transform_pandas 위에 배치할 수 있습니다. @lightweight는 또한 단일 노드 변환에 최적화된 최신 컴퓨트 엔진인 Polars와의 일류 통합을 제공합니다.

실제 성능은 파이프라인과 데이터의 복잡성에 따라 다릅니다. 따라서 @lightweight 백엔드를 사용하면서 변환의 실행 시간을 비교하는 것이 좋습니다.

경량 백엔드로 실행할 때 Spark DataFrames 및 Spark 컨텍스트는 사용할 수 없습니다.

@lightweight를 사용하면 기존 변환 기능 중 많은 것을 사용할 수 있습니다. 동일한 리포지토리에서 일반 및 경량 변환을 혼합하고, 미리보기하고, Marketplace를 통해 경량 변환을 패키지 및 설치할 수 있습니다. 그러나 지원되지 않는 기능도 있으며, 아래에서 더 자세히 알아볼 수 있습니다.

API 주요 내용

다음 섹션에서는 경량 변환 API를 보여줍니다. 다양한 데이터 처리 엔진과 상호 작용하기 위해 API가 사용되는 구체적인 예제를 보려면 경량 예제를 검토하세요.

@lightweight를 사용하기 전에 다음 사전 요구 사항 단계를 수행하세요:

  1. Python 리포지토리를 최신 버전으로 업그레이드합니다.
  2. Libraries 탭에서 foundry-transforms-lib-python을 설치합니다.

리소스 프로비저닝

Spark 프로필에 의존하여 리소스를 요청하는 대신, cpu_coresmemory_gb 또는 memory_mb 키워드 인수를 통해 데코레이터를 호출할 때 리소스를 더 세분화하여 요청할 수 있습니다. 기본적으로 최대 허용 값은 8개의 코어와 32GB의 메모리입니다. 이러한 제한을 늘리려면 Palantir 지원에 문의하세요.

Copied!
1 2 3 4 5 6 7 8 import polars as pl # polars 라이브러리를 불러옵니다. polars는 큰 데이터 프레임을 빠르게 처리할 수 있는 라이브러리입니다. from transforms.api import transform, Input, Output, lightweight # transforms.api에서 필요한 함수와 클래스를 불러옵니다. @lightweight(cpu_cores=3.4, memory_gb=16) # lightweight 데코레이터를 사용하여 해당 함수에 할당된 CPU 코어와 메모리를 제한합니다. @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) # transform 데코레이터를 사용하여 입력 및 출력 경로를 설정합니다. def compute(output, dataset): # compute라는 함수를 정의합니다. # dataset을 polars 형식으로 읽고, 'Name' 열이 'A'로 시작하는 행만 필터링하여 output에 씁니다. output.write_table(dataset.polars(lazy=True).filter(pl.col('Name').str.starts_with('A')))

변환에 필요한 리소스를 미세조정할 수 있도록 @lightweight()에 리소스 요청을 인수로 전달할 수 있습니다. 스니펫에 있는 값은 일반적으로 Spark 변환의 기본값을 반영합니다.

리소스 프로비저닝 API의 추가적인 이점은 이제 GPU를 간편하게 요청할 수 있다는 것입니다:

Copied!
1 2 3 4 5 6 7 8 import torch # pytorch와 pytorch-gpu를 meta.yaml에 추가하는 것을 잊지 마세요 import logging from transforms.api import transform, Output, lightweight @lightweight(gpu_type='NVIDIA_T4') @transform(out=Output('/Project/folder/output')) def compute(out): logging.info(torch.cuda.get_device_name(0)) # 현재 사용 중인 GPU 이름을 로깅합니다.

위 코드 조각은 사용자의 Foundry 등록이 NVIDIA T4 GPU로 구성되어 있으며, 프로젝트에서 사용할 수 있음을 가정합니다.

데이터 형식

@transform_pandas 위에 @lightweight를 사용할 때는 @lightweight 없이 사용하는 것과 동일한 API를 사용할 수 있습니다. @transform 위에 @lightweight를 사용하면 사용자 함수의 입력 및 출력에 대한 추가 메서드를 제공합니다.

Copied!
1 2 3 4 5 6 7 8 @lightweight @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): polars_df = dataset.polars() # polars_df는 polars.DataFrame 객체입니다 lazy_df = dataset.polars(lazy=True) # 스트리밍 모드를 활성화합니다, lazy_df는 polars.LazyFrame입니다 pandas_df = dataset.pandas() # pandas_df는 pandas.DataFrame 객체입니다 arrow_table = dataset.arrow() # arrow_table은 pyarrow.Table입니다 out.write_table(lazy_df) # 위의 어떤 형식도 write_table에 전달될 수 있습니다

위의 스니펫을 참조하여 사용 가능한 데이터셋 형식을 확인하십시오. dataset.pandas()를 호출하는 것은 Pandas가 환경에 설치되어 있음을 요구합니다. 마찬가지로, dataset.polars(...)는 Polars가 이용 가능해야 합니다.

파일 접근하기

Lightweight 입력과 출력은 .filesystem()과 같은 잘 알려진 메소드도 노출합니다. 아래의 스니펫은 비구조화된 파일@lightweight 없이 처리될 수 있는 것과 같은 방식으로 처리될 수 있다는 것을 보여줍니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 @lightweight @transform(my_output=Output('/Project/folder/output'), my_input=Input('/Project/folder/input')) def files(my_input, my_output): # 입력 경로의 모든 파일을 반복 for file in my_input.filesystem().ls(): # 입력 파일을 바이너리 모드로 읽기 with my_input.filesystem().open(file.path, "rb") as f1: # 출력 파일을 바이너리 모드로 쓰기 with my_output.filesystem().open(file.path, "wb") as f2: # 입력 파일의 내용을 출력 파일에 쓰기 f2.write(f1.read())

변환 생성기

Lightweight 변환과 함께 변환 생성기 사용이 지원됩니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def create_transforms(): results = [] # 10과 20의 크기로 반복 for size in [10, 20]: # 경량 데코레이터 @lightweight # 변환 데코레이터, 입력 및 출력 경로 설정 @transform( output=Output(f"{root_folder}/demo-outputs/lightweight-polars-{size}"), df=Input(f"{root_folder}/demo-inputs/people-{size}") ) # 경량 Polars 구현 함수 정의 def lightweight_polars(output, df): # Polars 구현을 사용하여 결과 테이블 작성 output.write_table(polars_implementation(df.polars(lazy=True))) # 결과 목록에 경량 Polars 함수 추가 results.append(lightweight(lightweight_polars)) return results # 변환 함수 생성 TRANSFORMS = create_transforms()

Lightweight 변환의 한계

다음 기능들은 아직 Lightweight 변환에서 지원되지 않습니다:

  • 데이터 기대치

Polars

Polars는 단일 노드 변환에 최적화된 현대적인 계산 엔진입니다. Polars는 Rust로 작성되었고 파이프라인 실행 중에 네이티브 가속을 활용할 수 있도록 Python 래퍼를 제공합니다.

예를 들어, 이것이 Palantir에서 Lightweight 변환을 벤치마크하는 데 사용하는 Polars 파이프라인입니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def polars_implementation(polars_df): polars_df = polars_df.with_columns( pl.col("id").cast(pl.Int64).alias("id") ) # id 열을 Int64 형식으로 변환하고 열 이름을 id로 지정합니다. reciprocated_follows = ( polars_df .explode("follows") .select([ pl.col("id").alias("id1"), pl.col("follows").cast(pl.Int64).alias("id2"), ]) ) # follows 열을 펼친 다음, id 열을 id1으로, follows 열을 Int64 형식으로 변환하여 id2로 이름을 지정합니다. return ( polars_df .join( reciprocated_follows .join( reciprocated_follows, left_on=["id1", "id2"], right_on=["id2", "id1"], how="inner" ) # reciprocated_follows를 자기 자신과 조인하여 상호 팔로우를 찾습니다. .group_by("id1") .agg(pl.count("id2").alias("reciprocated_follows_count")), # id1 별로 그룹화하여 상호 팔로우 수를 계산합니다. left_on="id", right_on="id1", how="left", ) .drop(["email", "dob", "id1", "follows"]) # 필요 없는 열들(email, dob, id1, follows)을 제거합니다. )

Polars 스트리밍 모드

Polars는 Lightweight 변환을 작성하기 위한 데이터 처리 라이브러리로 사용하기를 추천합니다. 가능한 경우, 데이터를 청크 단위로 로드하고 사용 가능한 메모리보다 크기가 큰 데이터셋을 처리할 수 있도록 하는 스트리밍 모드에서 Polars를 사용하는 것이 좋습니다. .polars(lazy=True) 메소드를 호출하여 스트리밍 모드에 접근할 수 있습니다. 현재, UDF와 정렬은 스트리밍 모드에서 완벽하게 지원되지 않습니다.

변환 작업이 오케스트레이션 레이어에 의해 관리되는 컨테이너에서 실행되는 경우, 컨테이너의 메모리 제한이 초과되면 컨테이너가 종료될 수 있습니다. 스트리밍 모드에서 Polars는 이 제한을 인식하지 못하고, 이 제한을 초과할 수 있습니다. 메모리 부족 (OOM) 오류가 발생하면, @lightweight(memory_gb=32) 또는 다른 적절한 값으로 컨테이너의 메모리 제한을 늘려주세요.

다음 단계

변환 API 출처에서 Lightweight API에 대해 더 알아보거나 Lightweight 예시에서 실제 유즈케이스를 고려해보세요.