본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

변환과 파이프라인

Python에서 transforms.api.Transform은 데이터셋을 계산하는 방법에 대한 설명입니다. 다음을 설명합니다:

  • 입력 및 출력 데이터셋
  • 입력 데이터셋을 출력 데이터셋으로 변환하는 데 사용되는 코드(이를 계산 함수라고 하겠습니다), 그리고
  • configure() 데코레이터에 의해 정의된 추가 구성(런타임에 사용할 사용자 지정 Transforms 프로필 포함)

입력 및 출력 데이터셋과 변환 코드는 Transform 오브젝트에 포함되어 있고, 그런 다음 Pipeline에 등록됩니다. 수동으로 Transform 오브젝트를 구성해서는 안 됩니다. 대신 아래에서 설명하는 데코레이터 중 하나를 사용해야 합니다.

데이터 변환은 pyspark.sql.DataFrame 오브젝트와 파일 측면에서도 표현될 수 있다는 점을 기억하는 것이 중요합니다. DataFrame 오브젝트에 의존하는 변환의 경우 변환 데코레이터를 사용하고 입력 데이터셋이 포함된 DataFrame에 대한 메서드를 명시적으로 호출하거나 DataFrame 변환 데코레이터를 사용할 수 있습니다. 파일에 의존하는 변환의 경우 변환 데코레이터를 사용한 다음 데이터셋 내의 파일에 액세스하면 됩니다. 데이터 변환에 Pandas 라이브러리만 사용할 경우 Pandas 변환 데코레이터를 사용할 수 있습니다.

하나의 Python 파일에 여러 개의 Transform 오브젝트를 정의할 수 있습니다. 또한 모든 변환은 현재 트랜잭션 유형 SNAPSHOT으로 실행됩니다.

변환

데코레이터

transforms.api.transform() 데코레이터는 DataFrame 오브젝트 또는 파일에 의존하는 데이터 변환을 작성할 때 사용할 수 있습니다. 이 데코레이터는 키워드 인수로 여러 개의 transforms.api.Inputtransforms.api.Output 사양을 받아들입니다. Foundry 빌드 중에 이러한 사양은 각각 transforms.api.TransformInputtransforms.api.TransformOutput 오브젝트로 해결됩니다. 이러한 TransformInputTransformOutput 오브젝트는 계산 함수 내에서 데이터셋에 액세스할 수 있도록 합니다.

입력 및 출력에 사용되는 키워드 이름은 래핑된 계산 함수의 매개변수 이름과 일치해야 합니다.

transform() 데코레이터를 사용하여 Transform 오브젝트를 생성하는 간단한 예제를 살펴보겠습니다. /examples/students_hair_eye_color라는 작은 샘플 데이터셋을 사용합니다. 다음은 데이터셋의 미리보기입니다:

>>> students_input = foundry.input('/examples/students_hair_eye_color') # '/examples/students_hair_eye_color' 경로에서 학생들의 데이터를 가져옵니다.
>>> students_input.dataframe().sort('id').show(n=3) # 가져온 데이터를 'id' 기준으로 정렬하고, 상위 3개의 데이터만 보여줍니다.
+---+-----+-----+----+
| id| hair|  eye| sex|
+---+-----+-----+----+
|  1|Black|Brown|Male| # 남성 학생 1번은 머리 색깔이 검정색, 눈 색깔이 갈색입니다.
|  2|Brown|Brown|Male| # 남성 학생 2번은 머리 색깔이 갈색, 눈 색깔이 갈색입니다.
|  3|  Red|Brown|Male| # 남성 학생 3번은 머리 색깔이 빨강색, 눈 색깔이 갈색입니다.
+---+-----+-----+----+
only showing top 3 rows # 상위 3개의 데이터만 보여줍니다.

이제, 입력값으로 /examples/students_hair_eye_color를 사용하고 결과물로 /examples/hair_eye_color_processed를 생성하는 Transform을 정의할 수 있습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # students_hair_eye_color 데이터를 불러옵니다. processed=Output('/examples/hair_eye_color_processed') # 처리된 데이터를 저장할 위치를 지정합니다. ) def filter_hair_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 머리색이 갈색인 학생들만 필터링하는 함수입니다. filtered_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') # 머리색이 갈색인 학생들만 필터링하여 새로운 데이터프레임을 만듭니다. processed.write_dataframe(filtered_df) # 만들어진 데이터프레임을 저장합니다.

"hair_eye_color"의 입력 이름과 "processed"의 출력 이름이 filter_hair_color 계산 함수에서 매개변수 이름으로 사용됩니다. 또한, filter_hair_colorTransformInput에서 dataframe() 메소드를 사용하여 DataFrame을 읽습니다. 이 DataFramefilter()를 사용하여 필터링되며, 이는 일반적인 PySpark 함수입니다. 이 필터링된 DataFramewrite_dataframe() 메소드를 사용하여 "processed"라는 출력에 기록됩니다.

TransformInput에 의해 반환된 DataFrame 객체는 일반적인 PySpark DataFrame입니다. PySpark를 사용하는 방법에 대한 자세한 정보는 온라인으로 제공되는 Spark Python API 문서를 참조할 수 있습니다.

데이터 변환에 DataFrame 객체 대신 파일에 대한 액세스가 필요한 경우, 파일 액세스 섹션을 참조하십시오.

다중 출력

단일 입력 데이터셋을 여러 부분으로 나누어야 하는 경우에 유용한 것이 다중 출력 변환입니다. 다중 출력 변환은 transforms.api.transform() 데코레이터와 함께 사용되어야 합니다. /examples/students_hair_eye_color 데이터셋을 다시 생각해 보세요:

>>> students_input = foundry.input('/examples/students_hair_eye_color')
# 학생들의 머리카락 색과 눈 색에 대한 정보를 가져옵니다.
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair|  eye| sex|
+---+-----+-----+----+
|  1|Black|Brown|Male|
|  2|Brown|Brown|Male|
|  3|  Red|Brown|Male|
+---+-----+-----+----+
# 상위 3개의 데이터만 보여줍니다.

이제 입력을 분할하기 위해 transform() 데코레이터에 여러 Output 사양을 전달할 수 있습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # 학생들의 머리와 눈 색에 대한 정보를 받아옵니다. males=Output('/examples/hair_eye_color_males'), # 남성 데이터를 출력할 경로를 설정합니다. females=Output('/examples/hair_eye_color_females'), # 여성 데이터를 출력할 경로를 설정합니다. ) def brown_hair_by_sex(hair_eye_color, males, females): # type: (TransformInput, TransformOutput, TransformOutput) -> None brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') # 머리 색이 갈색인 데이터만 필터링합니다. males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male')) # 성별이 남성인 데이터만 필터링하여 출력합니다. females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female')) # 성별이 여성인 데이터만 필터링하여 출력합니다.

한 번만 갈색 머리 필터를 적용한 후, 필터링된 데이터셋을 공유해 여러 출력 데이터셋을 생성할 수 있음을 주목하세요.

DataFrame 변환 데코레이터

Python에서 데이터 변환은 대체로 DataFrame 객체를 읽고, 처리하고, 쓰는 과정입니다. 만약 데이터 변환이 DataFrame 객체에 의존한다면, transforms.api.transform_df() 데코레이터를 사용할 수 있습니다. 이 데코레이터는 DataFrame 객체를 주입하고, 계산 함수가 DataFrame을 반환하게 합니다. 또는, 보다 일반적인 transform() 데코레이터를 사용하고 명시적으로 dataframe() 메소드를 호출하여 입력 데이터셋을 포함한 DataFrame에 접근할 수 있습니다. 이 transform() 데코레이터는 더 강력한 transforms.api.TransformInputtransforms.api.TransformOutput 객체를 주입하며, DataFrame 객체 대신 사용합니다. transform_df() 데코레이터는 transforms.api.Input 사양의 수를 키워드 인자로 받아들이고, 위치 인자로 단일한 transforms.api.Output 사양을 받아들입니다. Python에서 요구하는 것처럼, 위치 Output 인자는 먼저 나타나야 하며, 그 뒤에 키워드 Input 인자가 따라야 합니다. transform_df() 데코레이터를 사용하여 Transform 객체를 생성하는 간단한 예를 살펴봅시다. 여기서는 위에서 언급한 작은 샘플 데이터셋인 /examples/students_hair_eye_color를 사용하겠습니다. 다음은 데이터셋의 미리보기입니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 >>> students_input = foundry.input('/examples/students_hair_eye_color') # foundry의 input 함수를 사용하여 '/examples/students_hair_eye_color' 경로의 데이터를 students_input 변수에 저장합니다. >>> students_input.dataframe().sort('id').show(n=3) # students_input에 저장된 데이터를 데이터프레임 형태로 변환하고, 'id' 기준으로 정렬한 후 상위 3개의 행을 출력합니다. +---+-----+-----+----+ | id| hair| eye| sex| +---+-----+-----+----+ | 1|Black|Brown|Male| | 2|Brown|Brown|Male| | 3| Red|Brown|Male| +---+-----+-----+----+ # id, 머리색(hair), 눈색(eye), 성별(sex) 정보를 가진 학생 데이터 중에서 상위 3명의 데이터만 보여주고 있습니다. # 이 데이터는 오직 상위 3행만을 보여주는 예시입니다.

이제 위의 Transform decorator 섹션의 예제를 수정하여 transform_df() 데코레이터를 사용하도록 하겠습니다. 입력으로 /examples/students_hair_eye_color을 사용하고 결과물로 /examples/hair_eye_color_processed를 생성하는 Transform을 정의합니다:

Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform_df, Input, Output @transform_df( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pyspark.sql.DataFrame) -> pyspark.sql.DataFrame # 갈색 머리인 학생들의 데이터만 필터링 return hair_eye_color.filter(hair_eye_color.hair == 'Brown')

"hair_eye_color"의 입력 이름은 filter_hair_color 계산 함수의 파라미터 이름으로 사용됩니다. 또한, Python은 위치 인수가 키워드 인수 앞에 와야 하므로, Output 인수는 모든 Input 인수 앞에 나타납니다.

Pandas Transform 데코레이터

경고

transform_pandas 데코레이터는 메모리에 맞는 데이터셋에서만 사용해야 합니다. 그렇지 않으면, transform_df 데코레이터를 사용하여 데이터 변환을 작성하고, 입력 데이터셋을 Pandas 데이터프레임으로 변환하기 전에 필터링해야 합니다. toPandas 메소드를 사용합니다.

meta.yaml에서 PyArrow를 의존성으로 추가하여 toPandas 메소드를 사용하는 것이 좋습니다. 이렇게 하면 Arrow와 함께 Pandas DataFrame 변환 최적화가 가능합니다.

데이터 변환이 Pandas 라이브러리에만 의존하는 경우, transforms.api.transform_pandas() 데코레이터를 사용할 수 있습니다. Pandas 라이브러리를 사용하려면, meta.yml 파일에서 pandas실행 의존성으로 추가해야 합니다. 자세한 정보는 meta.yml 파일 설명 섹션을 참조하십시오.

transform_pandas() 데코레이터는 transform_df() 데코레이터와 유사하지만, transform_pandas()는 입력 데이터셋을 pandas.DataFrame 객체로 변환하고 pandas.DataFrame의 반환 유형을 받아들입니다.

transform_pandas() 데코레이터는 키워드 인수로 여러 transforms.api.Input 사양을 받아들이고, 위치 인수로 단일 transforms.api.Output 사양을 받아들입니다. Python이 요구하는 대로, 위치 Output 인수는 먼저 나타나고, 그 다음에 키워드 Input 인수가 이어집니다.

transform_pandas() 데코레이터를 사용하여 Transform 오브젝트를 생성하는 간단한 예제를 살펴보겠습니다. 위에서 사용한 동일한 샘플 데이터셋인 /examples/students_hair_eye_color을 사용할 것입니다. 데이터셋의 미리보기는 다음과 같습니다:

# 학생들의 머리색과 눈색 데이터를 불러옵니다.
>>> students_input = foundry.input('/examples/students_hair_eye_color')
# 데이터프레임을 id를 기준으로 정렬하고 상위 3개의 데이터를 출력합니다.
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair|  eye| sex|
+---+-----+-----+----+
|  1|Black|Brown|Male|
|  2|Brown|Brown|Male|
|  3|  Red|Brown|Male|
+---+-----+-----+----+
# 상위 3개의 데이터만 출력합니다.

이제 /examples/students_hair_eye_color를 입력으로 사용하고 /examples/hair_eye_color_processed를 결과물로 생성하는 Transform을 정의할 수 있습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform_pandas, Input, Output @transform_pandas( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pandas.DataFrame) -> pandas.DataFrame # 머리 색이 갈색인 데이터만 반환 return hair_eye_color[hair_eye_color['hair'] == 'Brown']

"hair_eye_color"이라는 입력 이름이 filter_hair_color 계산 함수의 파라미터 이름으로 사용됩니다. 또한, Python은 위치 인수가 키워드 인수 앞에 오도록 요구하기 때문에, Output 인수는 모든 Input 인수 앞에 나타납니다. 이 예제는 pandas.DataFrame를 받아 반환하는 계산 함수에서 Transform을 생성합니다, 이는 위의 DataFrame Transform 데코레이터 섹션에서의 예제처럼 pyspark.sql.DataFrame 오브젝트가 아닙니다. Pandas DataFrames를 PySpark DataFrames로 변환하기 위해 createDataFrame() 메서드를 사용할 수 있습니다—이 메서드를 Transform contextspark_session 속성에 호출하십시오.

Transform context

데이터 변환이 입력 데이터셋 외의 것에 의존해야 하는 경우가 있을 수 있습니다. 예를 들어, 현재 Spark 세션에 액세스하거나 외부 서비스에 연락해야 하는 변환이 필요할 수 있습니다. 이러한 경우, 변환에 transforms.api.TransformContext 오브젝트를 주입할 수 있습니다.

TransformContext 오브젝트를 주입하려면, 계산 함수는 ctx라는 파라미터를 수용해야 합니다. 이는 또한 입력값이나 결과물이 ctx라는 이름을 가질 수 없음을 의미합니다. 예를 들어, Python 데이터 구조에서 DataFrame을 생성하려면 TransformContext 오브젝트를 사용할 수 있습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 from transforms.api import transform_df, Output # 데이터 프레임 생성 함수 정의 @transform_df( Output('/examples/context') ) def generate_dataframe(ctx): # type: (TransformContext) -> pyspark.sql.DataFrame # 스파크 세션을 이용해 데이터 프레임 생성 return ctx.spark_session.createDataFrame([ ['a', 1], ['b', 2], ['c', 3] ], schema=['letter', 'number'])

변환 로직 레벨 버저닝

경고

TLLV가 정확하게 작동하려면, 코드는 모든 import를 모듈 레벨에서 선언해야 하며, 다른 모듈의 객체를 패치하거나 수정하려고 시도해서는 안 됩니다. 프로젝트에서 이 경우가 아니면 TLLV를 비활성화해야 합니다. 아래의 코드 예제를 참조하여 자세한 정보를 얻으십시오. TLLV는 기본적으로 활성화됩니다. TLLV를 비활성화하려면 transformsPython 구성에서 tllv를 false로 설정하십시오. 이 구성은 Transforms Python 하위 프로젝트의 build.gradle 파일 안에 있습니다.

transformsPython {
    tllv false  # tllv를 false로 설정합니다. 
}

이 코드는 Python 변환 설정에 관련된 코드입니다. 여기서 'tllv'는 일반적으로 변환을 제어하는 데 사용하는 설정의 이름일 수 있습니다. 'false'는 이 설정을 비활성화한다는 것을 의미합니다.

Transform의 버전은 두 개의 변환 버전을 비교할 때 논리적으로 노후화를 고려하는 데 사용되는 문자열입니다. 입력값이 변경되지 않았고 변환의 버전이 변경되지 않은 경우, 변환의 출력은 최신 상태입니다. 버전이 변경되면, 변환의 출력은 무효화되고 다시 계산됩니다.

기본적으로, 변환의 버전에는 다음이 포함됩니다:

  • 변환이 정의된 모듈,
  • 변환에 의존하는 모든 모듈, 그리고
  • 프로젝트 의존성

이 중 어느 것이 변경되면 버전 문자열이 변경됩니다. 나열된 부분에 포함되지 않은 파일에서 변경이 발생하면 출력을 무효화하려는 경우, transformsPython 구성에서 tllvFiles를 설정합니다. 예를 들어, 파일에 대한 구성을 읽고 구성이 변경될 때 출력을 무효화하려는 경우 사용됩니다.

transformsPython {
    tllvFiles = [
        'path/to/file/you/want/to/include/relative/to/project/directory'  // '프로젝트 디렉토리에 대한 상대적인 경로를 포함하고자 하는 파일의 경로'
    ]
}

프로젝트 의존성 버전이 변경되었을 때 결과물이 무효화되지 않도록 하려면 tllvIncludeDeps를 false로 설정하십시오.

transformsPython {
    tllvIncludeDeps false  # tllvIncludeDeps를 false로 설정합니다.
}

다음은 유효하고 무효한 가져오기에 대한 코드 예시를 고려해보십시오:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # 모듈의 맨 위에서만 가져오기(import)를 하는 경우에는 걱정할 필요가 없습니다. from transforms.api import transform_df, Input, Output from myproject.datasets import utils from myproject.testing import test_mock as tmock import importlib # 모듈 범위에서만 `__import__` 또는 `importlib`을 사용하는 것은 괜찮습니다. logging = __import__('logging') my_compute = importlib.import_module('myproject.compute') def helper(x): # 이 부분은 유효하지 않습니다. 함수 또는 클래스 메서드에서 가져오기를 사용하려면 TLLV를 비활성화해야 합니다. # 모든 가져오기는 모듈 범위에서 이루어져야 합니다. import myproject.helpers as myhelp return myhelp.round(x) @transform_df( Output("/path/to/output/dataset"), my_input=Input("/path/to/input/dataset"), ) def my_compute_function(my_input): # 이 부분은 유효하지 않습니다. 함수에서 가져오기를 사용하려면 TLLV를 비활성화해야 합니다! ihelper = __import__('myproject.init_helper') my_functions = importlib.import_module('myproject.functions') return my_input

확장 모듈을 사용하고 있다면 TLLV를 비활성화해야 합니다.

파이프라인

저장소 내의 각 Transforms Python 서브 프로젝트는 단일한 transforms.api.Pipeline 객체를 노출합니다. 이 Pipeline 객체는 다음을 수행하는 데 사용됩니다:

  1. 빌드 방법에 대한 지침과 함께 Foundry에 데이터셋을 등록하고,
  2. Foundry 빌드 도중 주어진 데이터셋을 빌드하는 데 책임이 있는 transforms.api.Transform 객체를 찾아 실행합니다.

엔트리 포인트

Python 변환을 실행하는 런타임은 프로젝트의 Pipeline을 찾을 수 있어야 합니다. Pipeline을 내보내려면, 이를 Transforms Python 서브 프로젝트의 setup.py 파일의 entry_points 인자에 추가합니다. 엔트리 포인트에 대한 자세한 정보는 setuptools 라이브러리 문서를 참조할 수 있습니다. 기본적으로 각 Python 서브 프로젝트는 root라는 이름의 transforms.pipelines 엔트리 포인트를 내보내야 합니다. 엔트리 포인트는 Python 서브 프로젝트의 루트 디렉토리에 위치한 setup.py 파일에 정의되어 있습니다. 엔트리 포인트는 모듈 이름과 Pipeline 속성을 참조합니다. 예를 들어, myproject/pipeline.py에서 정의된 "my_pipeline"이라는 Pipeline이 있다고 가정해 보겠습니다:

Copied!
1 2 3 4 5 # transforms.api 라이브러리에서 Pipeline 클래스를 가져옵니다. from transforms.api import Pipeline # Pipeline 객체를 생성하여 my_pipeline 변수에 할당합니다. my_pipeline = Pipeline()

다음과 같이 setup.py에서 이 Pipeline을 등록할 수 있습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import os from setuptools import find_packages, setup # 패키지 설정 setup( # 엔트리 포인트 설정 entry_points={ # 변환 파이프라인 설정 'transforms.pipelines': [ # 프로젝트의 루트 경로와 파이프라인 함수 설정 'root = myproject.pipeline:my_pipeline' ] } )

위의 코드에서, root는 내보내는 Pipeline의 이름을 나타내며, myproject.pipelinePipeline을 포함하는 모듈을 의미하고, my_pipeline은 그 모듈에서 정의된 Pipeline 속성을 가리킵니다.

트랜스폼을 파이프라인에 추가하기

프로젝트의 파이프라인과 연결된 Transform 객체가 데이터셋을 Output으로 선언하면 이 데이터셋을 Foundry에서 빌드할 수 있습니다. Transform 객체를 Pipeline에 추가하는 두 가지 권장 방법은 수동 등록자동 등록입니다.

Tip

보다 고급 워크플로를 가지고 있거나 각 Transform 객체를 프로젝트의 파이프라인에 명확하게 추가하려면 수동 등록을 사용할 수 있습니다. 그렇지 않다면, 등록 코드가 간결하고 포함된 상태를 유지하기 위해 자동 등록을 사용하는 것이 강력히 추천됩니다. 자동 등록을 사용하면, discover_transforms 메소드는 모듈 레벨에서 정의된 모든 Transform 객체를 재귀적으로 찾습니다. 자세한 정보는 아래 섹션을 참조하십시오.

자동 등록

경고

discover_transforms 메소드는 발견하는 모든 모듈을 가져옵니다. 따라서, 가져온 모듈 내의 모든 코드가 실행됩니다.

프로젝트의 복잡성이 증가하면, 수동으로 Transform 객체를 Pipeline에 추가하는 것이 복잡해질 수 있습니다. 따라서, Pipeline 객체는 discover_transforms() 메소드를 제공하여 Python 모듈이나 패키지 내의 모든 Transform 객체를 재귀적으로 찾습니다.

Copied!
1 2 3 4 5 6 7 from transforms.api import Pipeline import my_module # 파이프라인 객체 생성 my_pipeline = Pipeline() # my_module에서 변환(transforms) 찾기 my_pipeline.discover_transforms(my_module)

수동 등록

Transform 객체는 add_transforms() 함수를 사용하여 수동으로 Pipeline에 추가할 수 있습니다. 이 함수는 임의의 수의 Transform 객체를 받아 Pipeline에 추가합니다. 또한 두 개의 Transform 객체가 동일한 결과물 데이터셋을 선언하지 않는지 확인합니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from transforms.api import transform_df, Pipeline, Input, Output # 트랜스폼 함수 정의 @transform_df( Output('/path/to/output/dataset'), # 출력 데이터셋 경로 my_input=Input('/path/to/input/dataset') # 입력 데이터셋 경로 ) def my_compute_function(my_input): return my_input # 입력 데이터셋을 그대로 반환 # 파이프라인 객체 생성 my_pipeline = Pipeline() # 트랜스폼 함수를 파이프라인에 추가 my_pipeline.add_transforms(my_compute_function)

변환 생성

경고

여러 개의 결과물을 생성하는 데이터 변환을 정의하려면 변환 생성을 사용하거나 다중 출력 변환 정의를 사용할 수 있습니다. 변환 생성을 사용하면 각 결과물에 대해 입력을 한 번씩 읽고 처리해야 할 수도 있습니다. 다중 출력 변환에서는 입력을 한 번만 읽고 처리할 수 있습니다.

여러 변환 객체에서 동일한 데이터 변환 로직을 재사용하려 할 수 있습니다. 예를 들어, 다음과 같은 시나리오를 고려해 보십시오:

  • 각종 주에 대한 정보가 포함된 입력 데이터셋이 있습니다. 주별로 입력을 필터링한 다음 다양한 통계를 계산하는 코드가 있습니다.
  • 널 값이 포함될 수 있는 여러 입력 데이터셋이 있습니다. 널 값을 제거하는 코드가 있습니다.

두 경우 모두 동일한 데이터 변환 코드를 여러 변환에 사용하는 것이 유용합니다. 각 출력에 대해 변환 객체를 별도로 정의하는 대신 for 루프를 사용하여 변환 객체를 생성한 다음 프로젝트의 파이프라인에 일괄 등록할 수 있습니다. 변환 생성을 위한 예제는 다음과 같습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from transforms.api import transform_df, Input, Output def transform_generator(sources): # type: (List[str]) -> List[transforms.api.Transform] transforms = [] # 이 예제는 여러 입력 데이터셋을 사용합니다. 당신은 또한 단일 입력 데이터셋에서 여러 출력을 생성할 수 있습니다. # (This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset.) for source in sources: @transform_df( Output('/sources/{source}/output'.format(source=source)), my_input=Input('/sources/{source}/input'.format(source=source)) ) def compute_function(my_input, source=source): # 함수에서 source 변수를 캡처하려면 기본 키워드 인자로 전달합니다. # (To capture the source variable in the function, you pass it as a defaulted keyword argument.) return my_input.filter(my_input.source == source) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
경고

함수에서 소스 변수를 캡처하려면, 기본값이 있는 키워드 인수 source를 사용하여 계산 함수에 전달해야 합니다.

경고

Transform을 생성하기 위해 루프를 사용할 때, Transform 객체를 생성하는 루프는 함수 내부에 있어야 합니다. 이는 파이썬 for 루프가 새로운 범위를 생성하지 않기 때문입니다. 함수를 사용하지 않으면 자동 등록이 잘못되어 for 루프에서 정의된 마지막 Transform 객체만 찾게 됩니다. 이 함수는 생성된 Transforms 객체의 목록을 반환하고 반환 값이 변수와 같아야 합니다. 이러한 기준을 자동 등록을 통해 발견되도록 설정된 모듈 내에서 사용하면 생성된 Transforms와 함께 자동 등록을 사용할 수 있습니다. 또는 수동 등록을 사용할 수 있습니다.

경고

입력 데이터셋의 목록이 빌드 간 변경된 경우(예: 입력 데이터셋의 목록이 빌드 간 수정된 파일에서 읽히는 경우), 빌드가 실패합니다. 이는 새로운 데이터셋 참조가 빌드에 대한 작업 사양에서 찾을 수 없기 때문입니다. 이 문제가 발생할 수 있다면, 대신 Logic Flow를 사용하는 것을 고려하세요.

수동 등록을 사용하는 경우, 생성된 변환을 파이프라인에 추가할 수 있습니다. * 구문에 익숙하지 않은 경우, 이 튜토리얼을 참조하세요.

Copied!
1 2 3 4 5 6 import my_module # 파이프라인 객체 생성 my_pipeline = Pipeline() # my_module 내의 TRANSFORMS를 사용하여 변환 추가 my_pipeline.add_transforms(*my_module.TRANSFORMS)
경고

Code Repositories의 빌드 버튼은 수동 등록에 대해 작동하지 않을 수 있으며 요청된 파일의 파이프라인에서 변환을 찾을 수 없음 오류가 발생할 수 있습니다. Data Lineage 또는 데이터셋 미리보기 애플리케이션으로 이러한 데이터셋을 빌드할 수 있습니다.