데이터 통합PythonLightweight transforms경량 변환 예제들

본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

경량 변환 예제들

속도 뿐만 아니라 경량 변환은 다목적입니다. 경량 변환은 사용자의 컴퓨트 엔진 선택에 대한 가정을 하지 않습니다. 다음 내용은 Lightweight APITransforms API에 의존하면서 다양한 컴퓨트 엔진이 어떻게 활용될 수 있는지를 보여줍니다. 그 후에, 우리 자신의 도커 이미지를 이용하여 필요한 환경을 포함한 매우 맞춤화된 비파이썬 데이터 처리 애플리케이션을 사용하는 방법을 보여줄 것입니다.

현대 싱글 노드 컴퓨트 엔진들

다음 통합의 기본 원칙은 우리가 탭형 Foundry 데이터셋을 여러 형식으로, 예를 들어 Pandas DataFrame, Arrow Table, Polars DataFrame, 심지어 원시 Parquet 또는 CSV 파일로 접근할 수 있다는 것입니다. 이것은 또한 Lightweight API에서 보여집니다. 메모리에서 Foundry로 테이블을 저장하려고 할 때, 우리는 읽은 형식의 어떤 것으로든지 그것들을 전달할 수 있습니다.

Ibis 사용하기

대부분의 현대 컴퓨트 엔진들은 분산 데이터 시스템 개념을 받아들이며 따라서 산업 표준 오픈 소스 소프트웨어에서 운영합니다. 메모리에서 테이블을 저장하는 공식 표준은 Arrow (외부)입니다. 시작하기 위해, 우리는 Ibis (외부) (DuckDB 백엔드와 함께)를 사용할 것인데, 이것은 내부적으로 Arrow 형식을 사용합니다. 이 경우, 우리는 my_input.arrow() 호출을 통해 Foundry 데이터셋을 pyarrow.Table (외부) 오브젝트로 읽을 수 있고, 그것을 변환한 다음 my_output.write_table(...)로 변환된 오브젝트를 Foundry로 다시 쓸 수 있습니다. 다음 예제를 고려해보세요:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import ibis from transforms.api import lightweight, transform, Output, Input @lightweight @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_ibis_transform(my_input, my_output): ibis_client = ibis.duckdb.connect(':memory:') # Foundry 데이터셋을 pyarrow.Table 객체로 가져옵니다. table = ibis_client.read_in_memory(my_input.arrow()) # 데이터 변환을 실행합니다. results = ( table .filter(table['name'].like('John%')) .execute() ) # pyarrow.Table 객체를 Foundry 데이터셋에 저장합니다. my_output.write_table(results)

Apache DataFusion 및 DuckDB 사용하기

때로는 데이터 역직렬화 단계를 생략하고 데이터셋의 원시 기본 파일을 직접 컴퓨팅 엔진으로 전달하는 것이 더 쉽습니다. 디스크에 있는 파일의 경로를(요청시 다운로드됨) my_input.path()를 호출하여 얻을 수 있습니다. Foundry에 원시 파일을 다시 작성할 때, 다음 두 가지 제한 사항을 염두에 두어야 합니다:

  • 이 API를 통해 Foundry 데이터셋에 저장할 수 있는 파일은 Parquet 파일만 가능합니다.
  • 파일은 my_output.path_for_write_table 값에 위치한 폴더에 배치해야 합니다.

두 기준이 모두 충족되면, 이 폴더의 경로를 호출하여 write_table을 사용할 수 있습니다: my_output.write_table(my_output.path_for_write_table). 이를 플랫폼 내에서 DataFusion (외부)을 사용하는 방법을 보여주는 다음 코드 스니펫을 고려해보세요.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import datafusion from datafusion import lit from datafusion.functions import col, starts_with from transforms.api import lightweight, transform, Output, Input @lightweight # 가벼운 변환 데코레이터 @transform(my_input=Input('my-input'), my_output=Output('my-output')) # 입력과 출력을 정의하는 변환 데코레이터 def my_datafusion_transform(my_input, my_output): ctx = datafusion.SessionContext() # datafusion 세션 컨텍스트를 생성 table = ctx.read_parquet(my_input.path()) # 입력으로부터 parquet 파일을 읽어들여서 테이블로 만듦 my_output.write_table( table .filter(starts_with(col("name"), lit("John"))) # "name"이라는 칼럼의 값이 "John"으로 시작하는 행만 필터링 .to_arrow_table() # 결과 테이블을 arrow 테이블 형식으로 변환 )

같은 접근법을 DuckDB (외부)에도 사용할 수 있으며, 다음 스니펫에서 보여줍니다. 폴더에서 모든 Parquet 파일을 읽어야 한다는 점을 유의해야 합니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import duckdb from transforms.api import lightweight, transform, Output, Input # DuckDB를 이용한 데이터 변환 함수 정의 @lightweight @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_duckdb_transform(my_input, my_output): # 메모리 내에서 DuckDB 연결하고 쿼리 실행 duckdb.connect(database=':memory:').execute(f""" COPY ( SELECT * FROM parquet_scan('{my_input.path()}/**/*.parquet') # Parquet 파일 읽기 WHERE Name LIKE 'John%' # 이름이 'John'으로 시작하는 데이터 필터링 ) TO '{my_output.path_for_write_table}' (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE) # 병렬로 쓰레드별로 Parquet 파일 작성하여 성능 최적화 """) # 결과 테이블 저장 my_output.write_table(my_output.path_for_write_table)

cuDF 활용하기

pandas.DataFrame를 사용하여 통합을 달성할 수도 있습니다. 다음 코드 조각은 cuDF (외부)를 가볍게 변환하는 데 사용하는 예입니다. 이를 통해 가능한 경우 GPU에서 고도로 병렬화된 방식으로 Pandas 코드를 실행할 수 있습니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 @lightweight(gpu_type='NVIDIA_T4', cpu_cores=4, memory_gb=32) @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_cudf_transform(my_input, my_output): import cudf # CI 중에는 CUDF를 가져오지 마십시오. 런타임에만 가져옵니다. df = cudf.from_pandas(my_input.pandas()[['name']]) # 이름이 'John'으로 시작하는 행만 필터링합니다. filtered_df = df[df['name'].str.startswith('John')] # 이름을 기준으로 정렬합니다. sorted_df = filtered_df.sort_values(by='name') # 정렬된 데이터프레임을 출력으로 저장합니다. my_output.write_table(sorted_df)

위 코드 조각은 Foundry 등록이 NVIDIA T4 GPU로 구비되어 있고 프로젝트에서 리소스 큐를 통해 사용할 수 있다고 가정합니다.

사용자 정의 컨테이너 워크플로

대부분의 계산은 Python만으로도 쉽게 표현할 수 있습니다. 그러나 비Python 실행 엔진이나 스크립트를 사용하고자 하는 경우도 있습니다. 예를 들어 F# 애플리케이션, 오래된 VBA 스크립트 또는 종료된 버전의 Python이 될 수 있습니다. 사용자 정의 컨테이너(BYOC) 워크플로를 사용하면 가능성이 무한합니다. 간단히 말해서 BYOC는 애플리케이션이 필요로 하는 모든 특정 의존성 및/또는 바이너리를 포함한 로컬 도커 이미지를 생성하고 이 이미지 위에 경량 변환을 실행할 수 있도록 합니다.

다음은 Foundry에서 COBOL 변환을 실행하는 방법의 예입니다. 간단하게 이해하기 위해 이 예제에서는 변환 내에서 COBOL 프로그램을 컴파일합니다. 대신 프로그램을 미리 컴파일하고 바이너리 실행 파일을 도커 이미지로 복사할 수도 있습니다. 먼저, 컨테이너화된 워크플로 활성화를 한 다음, 이러한 이미지 요구사항을 따라 로컬에서 도커 이미지를 빌드합니다.

다음은 간단한 예시를 위한 최소한의 Dockerfile입니다:

Copied!
1 2 3 4 5 6 7 8 9 10 # 기본 이미지로 ubuntu 최신 버전을 사용합니다. FROM ubuntu:latest # 패키지 업데이트 및 필요한 프로그램 설치: coreutils, curl, sed, build-essential, gnucobol RUN apt update && apt install -y coreutils curl sed build-essential gnucobol # 사용자 추가 (uid: 5001) RUN useradd --uid 5001 user # 추가한 사용자로 전환 USER 5001

Docker 이미지에서 가볍게 작동하려면 Python이 설치될 필요가 없습니다.

그런 다음, 이미지를 빌드하고 Foundry에 업로드합니다. 이를 위해 Artifacts 저장소를 생성하고 이미지에 태그를 붙이고 푸시하는 방법을 따라가십시오. 이 예에서 이미지는 my-image라는 이름과 0.0.1이라는 버젼으로 태그가 지정되어 있습니다. BYOC 가벼운 변환을 작성하기 전의 마지막 단계는 이 Artifacts 저장소를 Code Repository의 로컬 백업 저장소로 추가하는 것입니다.

다음 예는 COBOL 스크립트를 고려한 것으로, 이 스크립트는 CSV 파일을 생성하며, 그 소스 코드는 내 Code Repository의 resources/data_generator.cbl에 위치해 있습니다.

마지막 단계는 데이터 처리 프로그램을 Foundry에 연결할 수 있는 가벼운 변환을 작성하는 것입니다. 다음 스니펫은 Python API를 통해 데이터셋에 접근하는 방법과 질문의 이미지 내에 포함된 임의의 실행 파일을 사용하는 방법을 보여줍니다. COBOL 실행 파일을 호출하려면 Python 표준 라이브러리의 함수를 사용하십시오(이 경우, os.system(...)).

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from transforms.api import Output, transform, lightweight @lightweight(container_image='my-image', container_tag='0.0.1') @transform(my_output=Output('my-output')) def compile_cobol_data_generator(my_output): """Conda를 통해 얻기 어려운 종속성을 처리하는 방법을 보여줍니다.""" # Cobol 프로그램 컴파일 # (src 폴더의 모든 것은 $USER_WORKING_DIR/user_code 에서 사용 가능합니다) os.system("cobc -x -free -o data_generator $USER_WORKING_DIR/user_code/resources/data_generator.cbl") # data.csv를 생성하고 채우기 위해 프로그램 실행 os.system('$USER_WORKING_DIR/data_generator') # 결과를 Foundry에 저장 my_output.write_table(pd.read_csv('data.csv'))
미리보기

BYOC 워크플로에 대한 미리보기는 아직 지원되지 않습니다.

빌드 버튼을 사용하면 Docker 이미지에서 컨테이너를 인스턴스화하고 지정된 명령을 호출합니다. 리소스 할당, 로그, Foundry와의 통신, 권한 강제, 감사 가능성은 모두 자동으로 처리됩니다.

점진적 워크플로

점진적 경량 변환은 foundry-transforms-lib-python 라이브러리의 0.556.0 버전에서 지원됩니다.

점진적 경량 변환을 작성하려면, @lightweight 데코레이터 다음에 @incremental 데코레이터를 사용하십시오. 다음 코드는 예를 보여줍니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # transforms.api에서 필요한 함수들을 가져옵니다. from transforms.api import incremental, Input, lightweight, Output, transform # 함수가 lightweight(경량화된) 형태라는 것을 표시합니다. @lightweight() # 함수가 incremental(증분적) 형태라는 것을 표시합니다. # require_incremental=True는 증분적 처리가 필요함을 의미합니다. @incremental(require_incremental=True) # 데이터 변환 함수를 정의합니다. "my-input"으로부터 입력을 받고, "my-output"으로 출력을 합니다. @transform(my_input=Input("my-input"), my_output=Output('my-output')) def my_incremental_transform(my_input, my_output): # 입력받은 데이터를 pandas 형태로 변환하고, # 그 결과를 "my-output"에 쓰는 작업을 수행합니다. # 여기서 mode="added"는 새로 추가된 데이터만을 처리하겠다는 의미입니다. my_output.write_pandas(my_input.pandas(mode="added"))

다음 단계

라이트웨이트 변환에 대해 더 알아보려면, Foundry 배포의 출처 리소스 Marketplace 스토어에서 라이트웨이트 예제 Marketplace 제품을 설치하기 또는 변환 API 출처로 이동하기를 자유롭게 해보세요.