Python 변환 API는 Pipeline
을 구성하기 위한 클래스와 데코레이터를 제공합니다. 이 페이지에는 사용 가능한 함수에 대한 정보가 포함되어 있으며, 클래스에 대해서도 자세히 알아볼 수 있습니다.
함수 | 설명 |
---|---|
configure ([profile]) | 변환의 구성을 수정하는 데코레이터입니다. |
incremental ([require_incremental, ...]) | 입력값과 결과물을 transforms.api.incremental의 상응하는 것으로 변환하는 데코레이터입니다. |
lightweight ([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command]) | 데코레이터를 사용하여 컨테이너 변환에서 실행되도록 변환을 설정합니다. |
transform (ios) | 계산 함수를 Transform 오브젝트로 래핑합니다. |
transform_df (output, inputs) | 래핑된 계산 함수를 데이터프레임 변환으로 등록합니다. |
transform_pandas (output, inputs) | 래핑된 계산 함수를 판다스 변환으로 등록합니다. |
transform_polars (output, inputs) | 래핑된 계산 함수를 폴라스 변환으로 등록합니다. |
configure
transforms.api.configure
(profile=None, allowed_run_duration=None, run_as_user=False)configure
데코레이터는 Transform
을 래핑하는 데 사용해야 합니다:Copied!1 2 3 4 5 6
# 아래 코드는 Python decorator를 사용하여 함수에 메타 데이터를 추가하는 예입니다. >>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) # configure 데코레이터를 사용하여 실행자 메모리 설정을 중간으로 설정합니다. ... @transform(...) # transform 데코레이터를 사용하여 함수가 데이터 변환을 수행하도록 합니다. ... def my_compute_function(...): # 사용자 정의 계산 함수를 정의합니다. ... pass # 실제 계산 로직은 이 곳에 작성됩니다.
incremental
transforms.api.incremental
(require_incremental=False, semantic_version=1, snapshot_inputs=None)transforms.api.incremental
상응물로 변환하는 데코레이터.incremental
데코레이터는 Transform
을 래핑하는 데 사용해야 합니다:Copied!1 2 3 4 5 6
# @incremental() 데코레이터는 함수의 실행을 점진적으로 처리합니다. # @transform(...) 데코레이터는 특정 변환 로직을 적용하여 함수의 결과를 변환합니다. >>> @incremental() ... @transform(...) ... def my_compute_function(...): # my_compute_function 함수 정의 ... pass # 구현할 로직이 들어갈 위치
데코레이터는 마지막 빌드 시점의 입력값 상태를 결정하기 위해 출력 데이터셋에서 빌드 기록을 읽습니다. 이 정보는 TransformInput
, TransformOutput
, TransformContext
오브젝트를 점진적 버전으로 변환하는 데 사용됩니다: IncrementalTransformInput
, IncrementalTransformOutput
, IncrementalTransformContext
.
이 데코레이터는 transform_df()
와 transform_pandas()
데코레이터를 감싸는 데도 사용할 수 있습니다. 이 데코레이터들은 입력값에 대해 인수 없이 dataframe()
과 pandas()
를 호출하여 PySpark 및 Pandas DataFrame 객체를 추출합니다. 이는 사용된 읽기 모드가 항상 added
이고 쓰기 모드는 incremental
데코레이터에 의해 결정된다는 것을 의미합니다. 기본값이 아닌 모드 중 어떤 것을 읽거나 쓰려면 transform()
데코레이터를 사용해야 합니다.
추가된 PySpark 또는 Pandas 변환의 출력 행이 추가된 입력 행의 함수인 경우(예: 추가 예시), 기본 모드는 올바른 점진적 변환을 생성합니다.
변환에 SNAPSHOT
트랜잭션이 있는 데이터셋을 입력으로 사용하지만 참조 테이블과 같이 변환을 점진적으로 실행하는 능력을 변경하지 않는 경우, 전체 SNAPSHOT
로 변환을 실행하지 않도록 snapshot_inputs
인수를 검토하세요.
변환에 복잡한 로직(조인, 집계, 고유 등 포함)이 수행되는 경우, 이 데코레이터를 사용하기 전에 점진적 문서화를 읽는 것을 권장합니다.
SNAPSHOT
트랜잭션이 현재 변환의 출력을 무효화하지 않는 입력입니다. 예를 들어, 조회 테이블의 업데이트는 이전에 계산된 출력이 잘못되었다는 것을 의미하지 않습니다. 모든 입력이 이들을 제외하고 추가되거나 새로운 데이터가 없을 때 변환은 점진적으로 실행됩니다. snapshot_inputs를 읽을 때, IncrementalTransformInput
은 입력 데이터셋의 현재 뷰만 노출합니다.TypeError
Transform object
가 아닌 경우.KeyError
Transform object
에 존재하지 않는 경우.lightweight
transforms.api.lightweight
(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)lightweight
데코레이터는 Transform
를 감싸는 데 사용해야 합니다:Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
>>> @lightweight ... @transform( ... my_input=Input('/input'), # 입력 경로 설정 ... my_output=Output('/output') # 출력 경로 설정 ... ) ... def compute_func(my_input, my_output): ... my_output.write_pandas(my_input.pandas()) # 입력 데이터를 출력 경로에 pandas 형태로 저장합니다. >>> @lightweight() ... @transform( ... my_input=Input('/input'), # 입력 경로 설정 ... my_output=Output('/output') # 출력 경로 설정 ... ) ... def compute_func(my_input, my_output): ... for file in my_input.filesystem().ls(): # 입력 경로의 파일 목록을 가져옵니다. ... with my_input.filesystem().open(file.path) as f1: # 입력 파일을 엽니다. ... with my_output.filesystem().open(file.path, "w") as f2: # 출력 파일을 엽니다. ... f2.write(f1.read()) # 입력 파일의 내용을 출력 파일에 씁니다. >>> @lightweight(memory_gb=3.5) ... @transform_pandas( ... Output('/output'), # 출력 경로 설정 ... my_input=Input('/input') # 입력 경로 설정 ... ) ... def compute_func(my_input): ... return my_input # 입력 데이터를 그대로 반환합니다. >>> @lightweight(container_image='my-image', container_tag='0.0.1') ... @transform(my_output=Output('ri...my_output')) # 출력 경로 설정 ... def run_data_generator_executable(my_output): ... os.system('$USER_WORKING_DIR/data_generator') # data_generator 실행 파일을 실행합니다. ... my_output.write_table(pd.read_csv('data.csv')) # 생성된 data.csv 파일을 출력 경로에 저장합니다.
라이트웨이트 변환은 단일 노드에서 Spark 없이 실행되는 변환입니다. 라이트웨이트 변환은 작고 중간 크기의 데이터셋에 대해 더 빠르고 비용 효율적입니다. 그러나 라이트웨이트 변환은 Pandas와 파일시스템 API를 포함한 일반 변환의 API의 일부만 지원하면서 동시에 데이터셋에 접근하는 더 많은 방법을 제공합니다. 라이트웨이트 변환에 대한 자세한 정보는 라이트웨이트 문서를 참조하십시오.
이 데코레이터를 사용하려면 foundry-transforms-lib-python
이 의존성으로 추가되어야 합니다.
container_image
, container_tag
또는 container_shell_command
중 어느 것이 설정된 경우 container_image
와 container_tag
모두 설정되어야 합니다. container_shell_command
가 설정되지 않은 경우, Python 환경을 부트스트랩하고 변환에서 지정된 사용자 코드를 실행하는 기본 엔트리 포인트가 사용됩니다.
container_* 인자를 지정하는 것은 자신의 컨테이너(bring-your-own-container, BYOC) 워크플로를 참조하는 것입니다. 이는 사용자의 Code Repositories에서 모든 파일이 런타임에 $USER_WORKING_DIR/user_code
내에 사용 가능하게 되며 Python 환경이 사용 가능하게 됨을 보장합니다.
container_image
와 container_tag
에 의해 지정된 이미지는 Code Repositories의 Artifacts 백업 저장소에서 사용 가능해야 합니다. 자세한 내용은 BYOC 문서를 참조하십시오.
memory_gb
또는 memory_mb
둘 중 하나만 지정할 수 있습니다.transform
transforms.api.transform
(ios)계산 함수를 Transform 객체로 래핑합니다.
transform
데코레이터는 계산 함수에서 Transform
객체를 구성하는 데 사용됩니다. 입력과 결과물에 사용되는 이름은 래핑된 계산 함수의 파라미터 이름이어야 합니다. 계산 시간에, 함수는 입력과 결과물을 TransformInput
과 TransformOutput
객체로 전달받습니다.
Copied!1 2 3 4 5 6 7 8 9 10
>>> @transform( ... first_input=Input('/path/to/first/input/dataset'), # 첫 번째 입력 데이터셋의 경로 ... second_input=Input('/path/to/second/input/dataset'), # 두 번째 입력 데이터셋의 경로 ... first_output=Output('/path/to/first/output/dataset'), # 첫 번째 출력 데이터셋의 경로 ... second_output=Output('/path/to/second/output/dataset'), # 두 번째 출력 데이터셋의 경로 ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... first_output.write_dataframe(first_input.dataframe()) # 첫 번째 입력 데이터셋을 데이터프레임 형태로 첫 번째 출력 데이터셋에 작성 ... second_output.write_dataframe(second_input.dataframe()) # 두 번째 입력 데이터셋을 데이터프레임 형태로 두 번째 출력 데이터셋에 작성
계산 함수는 결과물에 데이터를 작성하는 것을 담당합니다.
선택적으로, _TransformContext_와 해당 SparkSession은 계산 함수 내에서 접근할 수 있습니다. 이를 사용하여 빈 데이터 프레임을 생성하거나, Spark 설정을 적용하는 등의 작업을 할 수 있습니다. 가능한 경우, SparkSession 객체를 통해 Spark 설정값을 설정하는 대신 기존의 기본 Spark 프로필을 사용하는 것을 권장합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
>>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # 이 예제에서는 Spark 세션을 사용하여 빈 데이터 프레임을 생성합니다. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... # 빈 데이터 프레임을 출력으로 작성합니다. ... output.write_dataframe(empty_df)
transform_df
transforms.api.transform_df
(결과물, 입력값)transform_df
데코레이터는 계산 함수를 받아들이고 반환하는 pyspark.sql.DataFrame
객체를 이용하여 Transform
객체를 구성하는 데 사용됩니다. transform()
데코레이터와 비슷하게, 입력 이름은 계산 함수의 파라미터 이름이 됩니다. 그러나 transform_df
는 위치 인수로 단일 Output
스펙만 허용합니다. 계산 함수의 반환값은 또한 DataFrame
이며 자동으로 단일 출력 데이터셋에 쓰여집니다.Copied!1 2 3 4 5 6 7 8 9
>>> @transform_df( ... Output('/path/to/output/dataset'), # 이름이 없는 Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... # 첫 번째 입력 데이터셋과 두 번째 입력 데이터셋을 합칩니다. ... return first_input.union(second_input)
선택적으로, TransformContext 및 해당 SparkSession도 계산 함수 내에서 접근할 수 있습니다. 이를 사용하여 빈 데이터 프레임을 생성하고, 스파크 구성을 적용하는 등의 작업을 수행할 수 있습니다. 가능한 경우 기존의 기본 스파크 프로필을 사용하는 것이 SparkSession 객체를 통해 스파크 설정 값을 지정하는 것보다 권장됩니다.
Copied!1 2 3 4 5 6 7 8 9 10 11
>>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # 이 예제에서는 Spark 세션을 사용하여 빈 데이터 프레임을 생성합니다. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))
transform_pandas
transforms.api.transform_pandas
(결과물, 입력값)판다스 라이브러리를 사용하려면, meta.yml
파일에서 pandas
를 run 종속성으로 추가해야 합니다.
transform_pandas
데코레이터는 계산 함수로부터 Transform
객체를 구성하는 데 사용되며, 이 계산 함수는 pandas.DataFrame
객체를 받아들이고 반환합니다. 이 데코레이터는 transform_df()
데코레이터와 유사하지만, pyspark.sql.DataFrame
객체가 계산 전에 pandas.DataFrame
객체로 변환되고, 계산 후에 다시 변환됩니다.
Copied!1 2 3 4 5 6 7 8 9
>>> @transform_pandas( ... Output('/path/to/output/dataset'), # 이름이 지정되지 않은 Output spec ... first_input=Input('/path/to/first/input/dataset'), # 첫 번째 입력 데이터셋 ... second_input=Input('/path/to/second/input/dataset'), # 두 번째 입력 데이터셋 ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... # 첫 번째 입력 데이터프레임과 두 번째 입력 데이터프레임을 연결하여 반환합니다. ... return first_input.concat(second_input)
transform_pandas
는 메모리에 맞출 수 있는 데이터셋에서만 사용해야 합니다. Pandas로 변환하기 전에 먼저 필터링하고자 하는 큰 데이터셋이 있는 경우, transform_df()
데코레이터와 pyspark.sql.SparkSession.createDataFrame()
메소드를 사용하여 변환을 작성해야 합니다.
Copied!1 2 3 4 5 6 7 8 9 10
>>> @transform_df( ... Output('/path/to/output/dataset'), # 이름이 없는 Output spec ... first_input=Input('/path/to/first/input/dataset'), # 첫 번째 입력 데이터셋 경로 ... second_input=Input('/path/to/second/input/dataset'), # 두 번째 입력 데이터셋 경로 ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() ... # 데이터의 일부분에 대해 pandas 작업을 수행한 후 PySpark DataFrame으로 다시 변환 ... return ctx.spark_session.createDataFrame(pd)
이 코드는 두 개의 입력 데이터셋을 받아 특정 조건('UK' 카운티)에 맞는 데이터만 필터링하여 pandas DataFrame으로 변환하는 함수입니다. 이 pandas DataFrame은 다시 PySpark DataFrame으로 변환되어 반환됩니다.
transform_polars
transforms.api.transform_polars
(결과물, 입력값)이 데코레이터를 사용하려면, foundry-transforms-lib-python
과 polars
를 meta.yml
파일에 실행 종속성으로 추가해야 합니다.
transform_polars
데코레이터는 polars.DataFrame
객체를 수락하고 반환하는 계산 함수에서 변환
객체를 구성하는 데 사용됩니다. 이 데코레이터는 transform_df()
데코레이터와 유사하지만, 사용자 코드에 polars.DataFrame
객체가 전달됩니다.
transform_polars
데코레이터는 @lightweight
데코레이터 위의 얇은 래퍼일 뿐입니다. 이를 사용하면 일반 변환의 일부 기능이 없는 경량 변환이 생성됩니다. 경량 변환에 대한 자세한 정보는 경량 문서를 참조하세요.
Spark 프로필과 일부 다른 변환 기능은 @lightweight
변환과 따라서 @transforms_polars
와 함께 사용할 수 없습니다.
Copied!1 2 3 4 5 6 7 8 9
>>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # 이름이 없는 Output 스펙 ... first_input=Input('ri.main.foundry.dataset.in1'), # 첫번째 입력 ... second_input=Input('ri.main.foundry.dataset.in2'), # 두번째 입력 ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... # 첫번째 입력 데이터 프레임과 두번째 입력 데이터 프레임을 'id'를 기준으로 내부 조인합니다. ... return first_input.join(second_input, on='id', how="inner")