경량 변환은 PySpark 쿼리를 실행하는 것을 지원하지 않습니다. 대신, 대체 API를 사용하여 쿼리를 작성해야 합니다.
Spark 변환은 특히 데이터가 로컬에 이미 존재할 때 많은 노드 간 계산을 확장하는 데 상당한 플랫폼 기능을 제공합니다. 그러나 많은 데이터 변환은 단일 기계로도 관리될 수 있습니다. 단일 기계로 데이터 처리가 충분한 경우에는 Spark를 사용하지 않고 단일 노드 사용 사례에 더 최적화된 컴퓨트 엔진을 사용하여 인프라 오버헤드를 줄일 수 있습니다.
이 문서는 변환 API에 대한 @lightweight
데코레이터를 설명합니다. 이 데코레이터는 Spark를 사용하지 않고 최대 약 1천만 행의 데이터셋 처리에 적합한 인프라를 요청하기 위해 @transform
및 @transform_pandas
위에 배치할 수 있습니다. @lightweight
는 또한 단일 노드 변환에 최적화된 최신 컴퓨트 엔진인 Polars와의 일류 통합을 제공합니다.
실제 성능은 파이프라인과 데이터의 복잡성에 따라 다릅니다. 따라서 @lightweight
백엔드를 사용하면서 변환의 실행 시간을 비교하는 것이 좋습니다.
경량 백엔드로 실행할 때 Spark DataFrames 및 Spark 컨텍스트는 사용할 수 없습니다.
@lightweight
를 사용하면 기존 변환 기능 중 많은 것을 사용할 수 있습니다. 동일한 리포지토리에서 일반 및 경량 변환을 혼합하고, 미리보기하고, Marketplace를 통해 경량 변환을 패키지 및 설치할 수 있습니다. 그러나 지원되지 않는 기능도 있으며, 아래에서 더 자세히 알아볼 수 있습니다.
다음 섹션에서는 경량 변환 API를 보여줍니다. 다양한 데이터 처리 엔진과 상호 작용하기 위해 API가 사용되는 구체적인 예제를 보려면 경량 예제를 검토하세요.
@lightweight
를 사용하기 전에 다음 사전 요구 사항 단계를 수행하세요:
foundry-transforms-lib-python
을 설치합니다.Spark 프로필에 의존하여 리소스를 요청하는 대신, cpu_cores
와 memory_gb
또는 memory_mb
키워드 인수를 통해 데코레이터를 호출할 때 리소스를 더 세분화하여 요청할 수 있습니다. 기본적으로 최대 허용 값은 8개의 코어와 32GB의 메모리입니다. 이러한 제한을 늘리려면 Palantir 지원에 문의하세요.
Copied!1 2 3 4 5 6 7 8
import polars as pl # polars 라이브러리를 불러옵니다. polars는 큰 데이터 프레임을 빠르게 처리할 수 있는 라이브러리입니다. from transforms.api import transform, Input, Output, lightweight # transforms.api에서 필요한 함수와 클래스를 불러옵니다. @lightweight(cpu_cores=3.4, memory_gb=16) # lightweight 데코레이터를 사용하여 해당 함수에 할당된 CPU 코어와 메모리를 제한합니다. @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) # transform 데코레이터를 사용하여 입력 및 출력 경로를 설정합니다. def compute(output, dataset): # compute라는 함수를 정의합니다. # dataset을 polars 형식으로 읽고, 'Name' 열이 'A'로 시작하는 행만 필터링하여 output에 씁니다. output.write_table(dataset.polars(lazy=True).filter(pl.col('Name').str.starts_with('A')))
변환에 필요한 리소스를 미세조정할 수 있도록 @lightweight()
에 리소스 요청을 인수로 전달할 수 있습니다. 스니펫에 있는 값은 일반적으로 Spark 변환의 기본값을 반영합니다.
리소스 프로비저닝 API의 추가적인 이점은 이제 GPU를 간편하게 요청할 수 있다는 것입니다:
Copied!1 2 3 4 5 6 7 8
import torch # pytorch와 pytorch-gpu를 meta.yaml에 추가하는 것을 잊지 마세요 import logging from transforms.api import transform, Output, lightweight @lightweight(gpu_type='NVIDIA_T4') @transform(out=Output('/Project/folder/output')) def compute(out): logging.info(torch.cuda.get_device_name(0)) # 현재 사용 중인 GPU 이름을 로깅합니다.
위 코드 조각은 사용자의 Foundry 등록이 NVIDIA T4 GPU로 구성되어 있으며, 프로젝트에서 사용할 수 있음을 가정합니다.
@transform_pandas
위에 @lightweight
를 사용할 때는 @lightweight
없이 사용하는 것과 동일한 API를 사용할 수 있습니다. @transform
위에 @lightweight
를 사용하면 사용자 함수의 입력 및 출력에 대한 추가 메서드를 제공합니다.
Copied!1 2 3 4 5 6 7 8
@lightweight @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): polars_df = dataset.polars() # polars_df는 polars.DataFrame 객체입니다 lazy_df = dataset.polars(lazy=True) # 스트리밍 모드를 활성화합니다, lazy_df는 polars.LazyFrame입니다 pandas_df = dataset.pandas() # pandas_df는 pandas.DataFrame 객체입니다 arrow_table = dataset.arrow() # arrow_table은 pyarrow.Table입니다 out.write_table(lazy_df) # 위의 어떤 형식도 write_table에 전달될 수 있습니다
위의 스니펫을 참조하여 사용 가능한 데이터셋 형식을 확인하십시오. dataset.pandas()
를 호출하는 것은 Pandas가 환경에 설치되어 있음을 요구합니다. 마찬가지로, dataset.polars(...)
는 Polars가 이용 가능해야 합니다.
Lightweight 입력과 출력은 .filesystem()
과 같은 잘 알려진 메소드도 노출합니다. 아래의 스니펫은 비구조화된 파일이 @lightweight
없이 처리될 수 있는 것과 같은 방식으로 처리될 수 있다는 것을 보여줍니다.
Copied!1 2 3 4 5 6 7 8 9 10 11
@lightweight @transform(my_output=Output('/Project/folder/output'), my_input=Input('/Project/folder/input')) def files(my_input, my_output): # 입력 경로의 모든 파일을 반복 for file in my_input.filesystem().ls(): # 입력 파일을 바이너리 모드로 읽기 with my_input.filesystem().open(file.path, "rb") as f1: # 출력 파일을 바이너리 모드로 쓰기 with my_output.filesystem().open(file.path, "wb") as f2: # 입력 파일의 내용을 출력 파일에 쓰기 f2.write(f1.read())
Lightweight 변환과 함께 변환 생성기 사용이 지원됩니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
def create_transforms(): results = [] # 10과 20의 크기로 반복 for size in [10, 20]: # 경량 데코레이터 @lightweight # 변환 데코레이터, 입력 및 출력 경로 설정 @transform( output=Output(f"{root_folder}/demo-outputs/lightweight-polars-{size}"), df=Input(f"{root_folder}/demo-inputs/people-{size}") ) # 경량 Polars 구현 함수 정의 def lightweight_polars(output, df): # Polars 구현을 사용하여 결과 테이블 작성 output.write_table(polars_implementation(df.polars(lazy=True))) # 결과 목록에 경량 Polars 함수 추가 results.append(lightweight(lightweight_polars)) return results # 변환 함수 생성 TRANSFORMS = create_transforms()
다음 기능들은 아직 Lightweight 변환에서 지원되지 않습니다:
Polars는 단일 노드 변환에 최적화된 현대적인 계산 엔진입니다. Polars는 Rust로 작성되었고 파이프라인 실행 중에 네이티브 가속을 활용할 수 있도록 Python 래퍼를 제공합니다.
예를 들어, 이것이 Palantir에서 Lightweight 변환을 벤치마크하는 데 사용하는 Polars 파이프라인입니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
def polars_implementation(polars_df): polars_df = polars_df.with_columns( pl.col("id").cast(pl.Int64).alias("id") ) # id 열을 Int64 형식으로 변환하고 열 이름을 id로 지정합니다. reciprocated_follows = ( polars_df .explode("follows") .select([ pl.col("id").alias("id1"), pl.col("follows").cast(pl.Int64).alias("id2"), ]) ) # follows 열을 펼친 다음, id 열을 id1으로, follows 열을 Int64 형식으로 변환하여 id2로 이름을 지정합니다. return ( polars_df .join( reciprocated_follows .join( reciprocated_follows, left_on=["id1", "id2"], right_on=["id2", "id1"], how="inner" ) # reciprocated_follows를 자기 자신과 조인하여 상호 팔로우를 찾습니다. .group_by("id1") .agg(pl.count("id2").alias("reciprocated_follows_count")), # id1 별로 그룹화하여 상호 팔로우 수를 계산합니다. left_on="id", right_on="id1", how="left", ) .drop(["email", "dob", "id1", "follows"]) # 필요 없는 열들(email, dob, id1, follows)을 제거합니다. )
Polars는 Lightweight 변환을 작성하기 위한 데이터 처리 라이브러리로 사용하기를 추천합니다. 가능한 경우, 데이터를 청크 단위로 로드하고 사용 가능한 메모리보다 크기가 큰 데이터셋을 처리할 수 있도록 하는 스트리밍 모드에서 Polars를 사용하는 것이 좋습니다. .polars(lazy=True)
메소드를 호출하여 스트리밍 모드에 접근할 수 있습니다. 현재, UDF와 정렬은 스트리밍 모드에서 완벽하게 지원되지 않습니다.
변환 작업이 오케스트레이션 레이어에 의해 관리되는 컨테이너에서 실행되는 경우, 컨테이너의 메모리 제한이 초과되면 컨테이너가 종료될 수 있습니다. 스트리밍 모드에서 Polars는 이 제한을 인식하지 못하고, 이 제한을 초과할 수 있습니다. 메모리 부족 (OOM) 오류가 발생하면, @lightweight(memory_gb=32)
또는 다른 적절한 값으로 컨테이너의 메모리 제한을 늘려주세요.
변환 API 출처에서 Lightweight API에 대해 더 알아보거나 Lightweight 예시에서 실제 유즈케이스를 고려해보세요.