Lightweight 변환은 Python 데이터 처리 파이프라인을 실행하기 위한 새로운 백엔드를 제시하며, 여러분이 이미 친숙하게 사용하고 있는 변환 API 대부분을 사용할 수 있게 해줍니다.
개별 컴퓨터의 성능이 향상됨에 따라, 점점 더 많은 데이터 변환이 단일 노드에서 실행될 수 있게 되었습니다. 이는 소형에서 중형 크기의 데이터셋의 경우, 분산 병렬성에 의존하지 않고도 변환이 실행될 수 있다는 것을 의미합니다. 이 접근법은 Spark 실행자의 분산 오케스트레이션과 관련된 오버헤드를 줄이고, Polars (외부) 또는 DuckDB (외부)와 같은 단일 노드 대체품을 데이터 파이프라인 작성에 사용할 수 있게 해줍니다.
Lightweight 변환 기능을 계속 확장하면서, foundry-transforms-lib-python
의 버전 5.400.0 또는 유사한 최신 버전으로 항상 리포지토리를 업그레이드하는 것을 권장합니다:
Lightweight 변환은 컨테이너 오케스트레이션 인프라 위에 구축되어 있으며, 이를 활용하려면 Foundry 등록에 해당 인프라가 존재해야 합니다.
이 예제는 Python 변환 파이프라인에서 lightweight 변환을 어떻게 사용하는지 보여줍니다. 다음과 같이 pandas를 통해 Spark 파이프라인을 사용한다고 가정해 봅시다: @transform_pandas
:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
from transforms.api import transform_pandas, Input, Output # 판다스를 이용한 데이터 변환 @transform_pandas( Output('/Project/folder/output'), # 출력 경로 설정 df=Input('/Project/folder/input'), # 입력 경로 설정 ) def compute(df): return ( df[df['Name'].str.startswith("A")] # 이름이 A로 시작하는 행만 선택 .loc[:, ['Name', 'Age']] # 이름과 나이 열만 선택 .sort_values(by="Age") # 나이를 기준으로 정렬 )
이를 경량 변환으로 바꾸기 위해서는 다음과 같은 단계를 따라야 합니다:
foundry-transforms-lib-python
을 설치합니다.@lightweight
를 임포트하고 적용합니다:Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from transforms.api import transform_pandas, Input, Output, lightweight @lightweight @transform_pandas( Output('/Project/folder/output'), df=Input('/Project/folder/input'), ) def compute(df): # Name이 'A'로 시작하는 데이터를 필터링합니다. # Name과 Age 열만 선택한 후, Age 기준으로 정렬합니다. return ( df[df['Name'].str.startswith("A")] .loc[:, ['Name', 'Age']] .sort_values(by="Age") )
작은 데이터에서의 변환 속도를 대략 두 배 빠르게 하기 위해 위에서 보여진 것처럼 가벼운 변환으로 이동합니다.
위에서 보여진 것처럼 @lightweight
는 @transform_pandas
와 호환되거나 .pandas()
메서드만 사용하는 @transform
파이프라인과 호환됩니다.
다음으로, 우리는 변환의 확장성을 향상시키기 위해 Polars를 사용하도록 설정할 수 있습니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
import polars as pl from transforms.api import transform, Input, Output, lightweight @lightweight @transform( # @transform_pandas에서 @transform으로 변경 output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input'), ) def compute(output, dataset): output.write_table(dataset .polars() .filter(pl.col('Name').str.starts_with('A')) # 이름이 'A'로 시작하는 행만 필터링 .select(['Name', 'Age']) # 'Name'과 'Age' 열만 선택 .sort(by='Age') # 'Age' 기준으로 정렬 )
이 파이프라인은 이제 사용 가능한 모든 CPU 코어를 사용하며, 불필요한 작업을 제거하고 Pandas보다 효율적인 알고리즘을 찾을 수 있는 Polars의 쿼리 최적화 엔진도 갖추고 있습니다.
가벼운 변환에 대해 더 알아보려면 가벼운 변환 API 문서로 계속하세요.