본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

Python 변환 API

Python 변환 API는 Pipeline을 구성하기 위한 클래스와 데코레이터를 제공합니다. 이 페이지에는 사용 가능한 함수에 대한 정보가 포함되어 있으며, 클래스에 대해서도 자세히 알아볼 수 있습니다.

함수설명
configure([profile])변환의 구성을 수정하는 데코레이터입니다.
incremental([require_incremental, ...])입력값과 결과물을 transforms.api.incremental의 상응하는 것으로 변환하는 데코레이터입니다.
lightweight([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command])데코레이터를 사용하여 컨테이너 변환에서 실행되도록 변환을 설정합니다.
transform(ios)계산 함수를 Transform 오브젝트로 래핑합니다.
transform_df(output, inputs)래핑된 계산 함수를 데이터프레임 변환으로 등록합니다.
transform_pandas(output, inputs)래핑된 계산 함수를 판다스 변환으로 등록합니다.
transform_polars(output, inputs)래핑된 계산 함수를 폴라스 변환으로 등록합니다.

configure

transforms.api.configure(profile=None, allowed_run_duration=None, run_as_user=False)

  • 변환의 구성을 수정하는 데코레이터입니다.
  • configure 데코레이터는 Transform을 래핑하는 데 사용해야 합니다:
Copied!
1 2 3 4 5 6 # 아래 코드는 Python decorator를 사용하여 함수에 메타 데이터를 추가하는 예입니다. >>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) # configure 데코레이터를 사용하여 실행자 메모리 설정을 중간으로 설정합니다. ... @transform(...) # transform 데코레이터를 사용하여 함수가 데이터 변환을 수행하도록 합니다. ... def my_compute_function(...): # 사용자 정의 계산 함수를 정의합니다. ... pass # 실제 계산 로직은 이 곳에 작성됩니다.

파라미터

  • profile (str 또는 List[str], 선택사항)
    • 사용할 변환 프로필 이름.
  • allowed_run_duration (timedelta, 선택사항)
    • 작업이 실패하기 전에 걸릴 수 있는 시간의 상한선. 주의해서 사용하십시오. 허용된 시간을 설정할 때 데이터 규모 또는 형태의 변화와 같은 변수를 고려하십시오. 지속 시간은 분 단위로만 계산됩니다. 중요: 점진적 변환에 이를 사용할 때 주의하십시오. 스냅샷을 실행할 때 지속 시간이 크게 변경될 수 있습니다.
  • run_as_user (boolean, 선택사항)
    • 변환을 사용자 권한으로 실행할지 여부를 결정합니다. 활성화되면 작업을 실행하는 사용자의 권한에 따라 작업이 다르게 작동할 수 있습니다.

발생


incremental

transforms.api.incremental(require_incremental=False, semantic_version=1, snapshot_inputs=None)

  • 입력값과 결과물을 transforms.api.incremental 상응물로 변환하는 데코레이터.
  • incremental 데코레이터는 Transform을 래핑하는 데 사용해야 합니다:
Copied!
1 2 3 4 5 6 # @incremental() 데코레이터는 함수의 실행을 점진적으로 처리합니다. # @transform(...) 데코레이터는 특정 변환 로직을 적용하여 함수의 결과를 변환합니다. >>> @incremental() ... @transform(...) ... def my_compute_function(...): # my_compute_function 함수 정의 ... pass # 구현할 로직이 들어갈 위치

데코레이터는 마지막 빌드 시점의 입력값 상태를 결정하기 위해 출력 데이터셋에서 빌드 기록을 읽습니다. 이 정보는 TransformInput, TransformOutput, TransformContext 오브젝트를 점진적 버전으로 변환하는 데 사용됩니다: IncrementalTransformInput, IncrementalTransformOutput, IncrementalTransformContext.

이 데코레이터는 transform_df()transform_pandas() 데코레이터를 감싸는 데도 사용할 수 있습니다. 이 데코레이터들은 입력값에 대해 인수 없이 dataframe()pandas()를 호출하여 PySpark 및 Pandas DataFrame 객체를 추출합니다. 이는 사용된 읽기 모드가 항상 added이고 쓰기 모드는 incremental 데코레이터에 의해 결정된다는 것을 의미합니다. 기본값이 아닌 모드 중 어떤 것을 읽거나 쓰려면 transform() 데코레이터를 사용해야 합니다.

추가된 PySpark 또는 Pandas 변환의 출력 행이 추가된 입력 행의 함수인 경우(예: 추가 예시), 기본 모드는 올바른 점진적 변환을 생성합니다.

변환에 SNAPSHOT 트랜잭션이 있는 데이터셋을 입력으로 사용하지만 참조 테이블과 같이 변환을 점진적으로 실행하는 능력을 변경하지 않는 경우, 전체 SNAPSHOT로 변환을 실행하지 않도록 snapshot_inputs 인수를 검토하세요.

변환에 복잡한 로직(조인, 집계, 고유 등 포함)이 수행되는 경우, 이 데코레이터를 사용하기 전에 점진적 문서화를 읽는 것을 권장합니다.

파라미터

  • require_incremental (bool, 선택사항)
    • True인 경우, 변환은 모든 출력 데이터셋에 확정된 트랜잭션이 없는 경우에만 비점진적으로 실행을 거부합니다.
  • semantic_version (int, 선택사항)
    • 기본값은 1입니다. 이 숫자는 변환의 의미적 특성을 나타냅니다. 이는 기존 출력을 무효화하는 방식으로 변환의 로직이 변경될 때마다 변경해야 합니다. 이 숫자를 변경하면 변환의 다음 실행이 비점진적으로 실행됩니다.
  • snapshot_inputs (str의 목록, 선택사항)
    • SNAPSHOT 트랜잭션이 현재 변환의 출력을 무효화하지 않는 입력입니다. 예를 들어, 조회 테이블의 업데이트는 이전에 계산된 출력이 잘못되었다는 것을 의미하지 않습니다. 모든 입력이 이들을 제외하고 추가되거나 새로운 데이터가 없을 때 변환은 점진적으로 실행됩니다. snapshot_inputs를 읽을 때, IncrementalTransformInput은 입력 데이터셋의 현재 뷰만 노출합니다.
  • allow_retention (bool, 선택사항)
    • True인 경우, foundry-retention에 의해 이루어진 삭제는 점진성을 깨뜨리지 않습니다.
  • strict_append (bool, 선택사항)
    • True인 경우, 기본 foundry 트랜잭션 유형은 APPEND가 됩니다. 쓰기 작업은 Parquet 요약 메타데이터나 Hadoop SUCCESS 파일과 같은 보조 파일조차 덮어쓰지 않을 수 있습니다. 모든 Foundry 형식의 점진적 쓰기는 이 모드를 지원해야 합니다.

발생 가능한 오류


lightweight

transforms.api.lightweight(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)

  • 컨테이너 변환 인프라 위에서 데코레이션된 변환을 실행하게 하는 데코레이터입니다.
  • lightweight 데코레이터는 Transform를 감싸는 데 사용해야 합니다:
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 >>> @lightweight ... @transform( ... my_input=Input('/input'), # 입력 경로 설정 ... my_output=Output('/output') # 출력 경로 설정 ... ) ... def compute_func(my_input, my_output): ... my_output.write_pandas(my_input.pandas()) # 입력 데이터를 출력 경로에 pandas 형태로 저장합니다. >>> @lightweight() ... @transform( ... my_input=Input('/input'), # 입력 경로 설정 ... my_output=Output('/output') # 출력 경로 설정 ... ) ... def compute_func(my_input, my_output): ... for file in my_input.filesystem().ls(): # 입력 경로의 파일 목록을 가져옵니다. ... with my_input.filesystem().open(file.path) as f1: # 입력 파일을 엽니다. ... with my_output.filesystem().open(file.path, "w") as f2: # 출력 파일을 엽니다. ... f2.write(f1.read()) # 입력 파일의 내용을 출력 파일에 씁니다. >>> @lightweight(memory_gb=3.5) ... @transform_pandas( ... Output('/output'), # 출력 경로 설정 ... my_input=Input('/input') # 입력 경로 설정 ... ) ... def compute_func(my_input): ... return my_input # 입력 데이터를 그대로 반환합니다. >>> @lightweight(container_image='my-image', container_tag='0.0.1') ... @transform(my_output=Output('ri...my_output')) # 출력 경로 설정 ... def run_data_generator_executable(my_output): ... os.system('$USER_WORKING_DIR/data_generator') # data_generator 실행 파일을 실행합니다. ... my_output.write_table(pd.read_csv('data.csv')) # 생성된 data.csv 파일을 출력 경로에 저장합니다.

라이트웨이트 변환은 단일 노드에서 Spark 없이 실행되는 변환입니다. 라이트웨이트 변환은 작고 중간 크기의 데이터셋에 대해 더 빠르고 비용 효율적입니다. 그러나 라이트웨이트 변환은 Pandas와 파일시스템 API를 포함한 일반 변환의 API의 일부만 지원하면서 동시에 데이터셋에 접근하는 더 많은 방법을 제공합니다. 라이트웨이트 변환에 대한 자세한 정보는 라이트웨이트 문서를 참조하십시오.

이 데코레이터를 사용하려면 foundry-transforms-lib-python이 의존성으로 추가되어야 합니다.

container_image, container_tag 또는 container_shell_command 중 어느 것이 설정된 경우 container_imagecontainer_tag 모두 설정되어야 합니다. container_shell_command가 설정되지 않은 경우, Python 환경을 부트스트랩하고 변환에서 지정된 사용자 코드를 실행하는 기본 엔트리 포인트가 사용됩니다.

container_* 인자를 지정하는 것은 자신의 컨테이너(bring-your-own-container, BYOC) 워크플로를 참조하는 것입니다. 이는 사용자의 Code Repositories에서 모든 파일이 런타임에 $USER_WORKING_DIR/user_code 내에 사용 가능하게 되며 Python 환경이 사용 가능하게 됨을 보장합니다.

container_imagecontainer_tag에 의해 지정된 이미지는 Code Repositories의 Artifacts 백업 저장소에서 사용 가능해야 합니다. 자세한 내용은 BYOC 문서를 참조하십시오.

파라미터

  • cpu_cores (float, 선택적)
    • 변환에 할당할 CPU 코어의 수, 분수일 수 있습니다. 기본값은 2입니다.
  • memory_mb (float, 선택적)
    • 컨테이너에 할당할 메모리 양, MB 단위입니다. memory_gb 또는 memory_mb 둘 중 하나만 지정할 수 있습니다.
  • memory_gb (float, 선택적)
    • 컨테이너에 할당할 메모리 양, GB 단위입니다. 기본값은 16 GB입니다.
  • gpu_type (str, 선택적_)
    • 변환에 할당할 GPU의 유형입니다.
  • container_image (str, 선택적_)
    • 변환의 컨테이너에 사용할 이미지입니다.
  • container_tag (str, 선택적_)
    • 변환의 컨테이너에 사용할 이미지 태그입니다.
  • container_shell_command (str, 선택적_)
    • 컨테이너에서 실행할 쉘 명령입니다. 지정되지 않은 경우, 컨테이너가 시작되면 사용자의 변환을 실행하는 기본값이 생성됩니다.

transform

transforms.api.transform(ios)

  • 계산 함수를 Transform 객체로 래핑합니다.

  • transform 데코레이터는 계산 함수에서 Transform 객체를 구성하는 데 사용됩니다. 입력과 결과물에 사용되는 이름은 래핑된 계산 함수의 파라미터 이름이어야 합니다. 계산 시간에, 함수는 입력과 결과물을 TransformInputTransformOutput 객체로 전달받습니다.

Copied!
1 2 3 4 5 6 7 8 9 10 >>> @transform( ... first_input=Input('/path/to/first/input/dataset'), # 첫 번째 입력 데이터셋의 경로 ... second_input=Input('/path/to/second/input/dataset'), # 두 번째 입력 데이터셋의 경로 ... first_output=Output('/path/to/first/output/dataset'), # 첫 번째 출력 데이터셋의 경로 ... second_output=Output('/path/to/second/output/dataset'), # 두 번째 출력 데이터셋의 경로 ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... first_output.write_dataframe(first_input.dataframe()) # 첫 번째 입력 데이터셋을 데이터프레임 형태로 첫 번째 출력 데이터셋에 작성 ... second_output.write_dataframe(second_input.dataframe()) # 두 번째 입력 데이터셋을 데이터프레임 형태로 두 번째 출력 데이터셋에 작성

파라미터

계산 함수는 결과물에 데이터를 작성하는 것을 담당합니다.

선택적으로, _TransformContext_와 해당 SparkSession은 계산 함수 내에서 접근할 수 있습니다. 이를 사용하여 빈 데이터 프레임을 생성하거나, Spark 설정을 적용하는 등의 작업을 할 수 있습니다. 가능한 경우, SparkSession 객체를 통해 Spark 설정값을 설정하는 대신 기존의 기본 Spark 프로필을 사용하는 것을 권장합니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 >>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # 이 예제에서는 Spark 세션을 사용하여 빈 데이터 프레임을 생성합니다. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... # 빈 데이터 프레임을 출력으로 작성합니다. ... output.write_dataframe(empty_df)

transform_df

transforms.api.transform_df(결과물, 입력값)

  • 래핑된 계산 함수를 데이터프레임 변환으로 등록합니다.
  • transform_df 데코레이터는 계산 함수를 받아들이고 반환하는 pyspark.sql.DataFrame 객체를 이용하여 Transform 객체를 구성하는 데 사용됩니다. transform() 데코레이터와 비슷하게, 입력 이름은 계산 함수의 파라미터 이름이 됩니다. 그러나 transform_df는 위치 인수로 단일 Output 스펙만 허용합니다. 계산 함수의 반환값은 또한 DataFrame이며 자동으로 단일 출력 데이터셋에 쓰여집니다.
Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_df( ... Output('/path/to/output/dataset'), # 이름이 없는 Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... # 첫 번째 입력 데이터셋과 두 번째 입력 데이터셋을 합칩니다. ... return first_input.union(second_input)

파라미터

  • output (Output)
    • 변환을 위한 단일 Output 사양.
  • inputs (Input)
    • 명명된 Input 사양으로 구성된 kwargs (키워드 인수).

선택적으로, TransformContext 및 해당 SparkSession도 계산 함수 내에서 접근할 수 있습니다. 이를 사용하여 빈 데이터 프레임을 생성하고, 스파크 구성을 적용하는 등의 작업을 수행할 수 있습니다. 가능한 경우 기존의 기본 스파크 프로필을 사용하는 것이 SparkSession 객체를 통해 스파크 설정 값을 지정하는 것보다 권장됩니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 >>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # 이 예제에서는 Spark 세션을 사용하여 빈 데이터 프레임을 생성합니다. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))

transform_pandas

transforms.api.transform_pandas(결과물, 입력값)

  • 판다스 변환으로 감싸진 계산 함수를 등록합니다.

판다스 라이브러리를 사용하려면, meta.yml 파일에서 pandasrun 종속성으로 추가해야 합니다.

transform_pandas 데코레이터는 계산 함수로부터 Transform 객체를 구성하는 데 사용되며, 이 계산 함수는 pandas.DataFrame 객체를 받아들이고 반환합니다. 이 데코레이터는 transform_df() 데코레이터와 유사하지만, pyspark.sql.DataFrame 객체가 계산 전에 pandas.DataFrame 객체로 변환되고, 계산 후에 다시 변환됩니다.

Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_pandas( ... Output('/path/to/output/dataset'), # 이름이 지정되지 않은 Output spec ... first_input=Input('/path/to/first/input/dataset'), # 첫 번째 입력 데이터셋 ... second_input=Input('/path/to/second/input/dataset'), # 두 번째 입력 데이터셋 ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... # 첫 번째 입력 데이터프레임과 두 번째 입력 데이터프레임을 연결하여 반환합니다. ... return first_input.concat(second_input)

transform_pandas는 메모리에 맞출 수 있는 데이터셋에서만 사용해야 합니다. Pandas로 변환하기 전에 먼저 필터링하고자 하는 큰 데이터셋이 있는 경우, transform_df() 데코레이터와 pyspark.sql.SparkSession.createDataFrame() 메소드를 사용하여 변환을 작성해야 합니다.

Copied!
1 2 3 4 5 6 7 8 9 10 >>> @transform_df( ... Output('/path/to/output/dataset'), # 이름이 없는 Output spec ... first_input=Input('/path/to/first/input/dataset'), # 첫 번째 입력 데이터셋 경로 ... second_input=Input('/path/to/second/input/dataset'), # 두 번째 입력 데이터셋 경로 ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() ... # 데이터의 일부분에 대해 pandas 작업을 수행한 후 PySpark DataFrame으로 다시 변환 ... return ctx.spark_session.createDataFrame(pd)

이 코드는 두 개의 입력 데이터셋을 받아 특정 조건('UK' 카운티)에 맞는 데이터만 필터링하여 pandas DataFrame으로 변환하는 함수입니다. 이 pandas DataFrame은 다시 PySpark DataFrame으로 변환되어 반환됩니다.

파라미터

  • 결과물 (결과물)
    • 변환에 대한 단일 결과물 사양입니다.
  • 입력값 (입력)
    • 명명된 입력 사양으로 구성된 kwargs (키워드 인수)입니다.

transform_polars

transforms.api.transform_polars(결과물, 입력값)

  • 래핑된 계산 함수를 Polars 변환으로 등록합니다.

이 데코레이터를 사용하려면, foundry-transforms-lib-pythonpolarsmeta.yml 파일에 실행 종속성으로 추가해야 합니다.

transform_polars 데코레이터는 polars.DataFrame 객체를 수락하고 반환하는 계산 함수에서 변환 객체를 구성하는 데 사용됩니다. 이 데코레이터는 transform_df() 데코레이터와 유사하지만, 사용자 코드에 polars.DataFrame 객체가 전달됩니다.

transform_polars 데코레이터는 @lightweight 데코레이터 위의 얇은 래퍼일 뿐입니다. 이를 사용하면 일반 변환의 일부 기능이 없는 경량 변환이 생성됩니다. 경량 변환에 대한 자세한 정보는 경량 문서를 참조하세요.

Spark 프로필과 일부 다른 변환 기능@lightweight 변환과 따라서 @transforms_polars와 함께 사용할 수 없습니다.

Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # 이름이 없는 Output 스펙 ... first_input=Input('ri.main.foundry.dataset.in1'), # 첫번째 입력 ... second_input=Input('ri.main.foundry.dataset.in2'), # 두번째 입력 ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... # 첫번째 입력 데이터 프레임과 두번째 입력 데이터 프레임을 'id'를 기준으로 내부 조인합니다. ... return first_input.join(second_input, on='id', how="inner")

파라미터

  • 결과물 (결과물)
    • 변환에 대한 단일 결과물 사양입니다.
  • 입력값 (입력)
    • 명명된 입력 사양으로 구성된 kwargs (키워드 인수)입니다.