注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Pythonでは、transforms.api.Transform
はデータセットの計算方法の説明です。次の内容を説明します:
configure()
デコレーターで定義される任意の追加設定(これには、実行時に使用するカスタムTransformsプロファイルが含まれます)入力データセットと出力データセット、および変換コードはTransform
オブジェクトにラップされ、Pipeline
に登録されます。Transform
オブジェクトを手動で作成するべきではありません。代わりに、以下に説明するデコレーターのいずれかを使用してください。
データ変換は、pyspark.sql.DataFrame
オブジェクトとファイルの両方の観点から表現できることを覚えておくことが重要です。DataFrame
オブジェクトに依存する変換の場合、transformデコレーターを使用して入力データセットを含むDataFrame
にアクセスするメソッドを明示的に呼び出すか、単純にDataFrame transformデコレーターを使用できます。ファイルに依存する変換の場合、transformデコレーターを使用してから、データセット内のファイルにアクセスします。データ変換がPandasライブラリのみを使用する場合、Pandas transformデコレーターを使用できます。
単一のPythonファイル内に複数のTransform
オブジェクトを定義することができます。また、現在すべての変換はトランザクションタイプSNAPSHOTで実行されます。
transforms.api.transform()
デコレーターは、DataFrame
オブジェクトまたはファイルに依存するデータ変換を記述する場合に使用できます。このデコレーターは、キーワード引数として複数のtransforms.api.Input
およびtransforms.api.Output
仕様を受け入れます。Foundryビルド中に、これらの仕様はそれぞれtransforms.api.TransformInput
およびtransforms.api.TransformOutput
オブジェクトに解決されます。これらのTransformInput
およびTransformOutput
オブジェクトは、計算関数内のデータセットへのアクセスを提供します。
入力と出力に使用されるキーワード名は、ラップされた計算関数のパラメーター名と一致している必要があります。
transform()
デコレーターを使用してTransform
オブジェクトを作成するための簡単な例を見てみましょう。ここでは、/examples/students_hair_eye_color
という小さなサンプルデータセットを使用します。データセットのプレビューは次の通りです:
>>> students_input = foundry.input('/examples/students_hair_eye_color') # foundryから生徒の髪の色と目の色のデータを入力します
>>> students_input.dataframe().sort('id').show(n=3) # データフレームを'id'でソートし、上位3行を表示します
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male| # 男性、髪の色は黒、目の色は茶色
| 2|Brown|Brown|Male| # 男性、髪の色は茶色、目の色も茶色
| 3| Red|Brown|Male| # 男性、髪の色は赤、目の色は茶色
+---+-----+-----+----+
上位3行のみを表示
これで、入力として /examples/students_hair_eye_color
を取り、出力として /examples/hair_eye_color_processed
を作成する Transform
を定義できます:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 茶色の髪の生徒のデータをフィルタリング filtered_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') # 処理済みのデータを書き込む processed.write_dataframe(filtered_df)
“hair_eye_color”の入力名と“processed”の出力名は、filter_hair_color
計算関数のパラメーター名として使用されます。
さらに、filter_hair_color
は、TransformInput
からdataframe()
メソッドを使用してDataFrame
を読み込みます。そのDataFrame
は、通常のPySpark関数であるfilter()
を使用してフィルター処理されます。このフィルター処理されたDataFrame
は、write_dataframe()
メソッドを使用して“processed”という名前の出力に書き込まれます。
TransformInput
によって返されるDataFrame
オブジェクトは、通常のPySparkデータフレームです。PySparkの使用方法について詳しくは、オンラインで利用可能なSpark Python APIドキュメンテーションを参照してください。
データ変換がファイルへのアクセスに依存する場合、DataFrame
オブジェクトではなく、ファイルアクセスのセクションを参照してください。
複数の出力を持つ変換は、単一の入力データセットをいくつかの部分に分割する必要がある場合に便利です。複数の出力を持つ変換は transforms.api.transform()
デコレーターでのみサポートされています。
/examples/students_hair_eye_color
データセットを思い出してください:
>>> students_input = foundry.input('/examples/students_hair_eye_color') # `/examples/students_hair_eye_color`からデータを取得します
>>> students_input.dataframe().sort('id').show(n=3) # 'id'に基づいてデータをソートし、上位3行を表示します
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
上位3行のみを表示しています
これで、入力を分割するために、複数の Output
仕様を transform()
デコレーターに渡すことができます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def brown_hair_by_sex(hair_eye_color, males, females): # type: (TransformInput, TransformOutput, TransformOutput) -> None # 茶色の髪のデータフレームを作成 brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') # 茶色の髪の男性データを書き込む males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male')) # 茶色の髪の女性データを書き込む females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female'))
フィルター処理して茶髪を一度だけ選択したことに注意してください。その後、フィルター処理したデータセットを共有して、複数の出力データセットを生成できました。
Pythonでのデータ変換では、DataFrame
オブジェクトを読み込み、処理し、書き込むことが一般的です。データ変換がDataFrame
オブジェクトに依存している場合、transforms.api.transform_df()
デコレーターを使用できます。このデコレーターはDataFrame
オブジェクトを注入し、計算関数がDataFrame
を返すことを期待します。
あるいは、より一般的なtransform()
デコレーターを使用し、明示的にdataframe()
メソッドを呼び出して入力データセットを含むDataFrame
にアクセスすることもできます。transform()
デコレーターは、より強力なtransforms.api.TransformInput
とtransforms.api.TransformOutput
オブジェクトを注入し、DataFrame
オブジェクトではなく、これらを注入します。
transform_df()
デコレーターは、キーワード引数としていくつかのtransforms.api.Input
仕様を受け入れ、位置引数として単一のtransforms.api.Output
仕様を受け入れます。Pythonの要求に従って、位置Output
引数が最初に現れ、次にキーワードInput
引数が続きます。
次に、transform_df()
デコレーターを使用してTransform
オブジェクトを作成するための簡単な例を見てみましょう。ここでは、上記の小さなサンプルデータセット/examples/students_hair_eye_color
を使用します。以下はデータセットのプレビューです:
# コードの説明
# students_input は、foundry.input() を使って '/examples/students_hair_eye_color' というパスからデータを取得しています。
# students_input.dataframe() は、データフレームに変換しています。
# sort('id') を使って、'id' カラムに基づいてデータフレームを並び替えています。
# 最後に、show(n=3) を使って最初の3行を表示しています。
>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows
今度は、上記の トランスフォームデコレータ セクションの例を修正して、transform_df()
デコレータを使用します。/examples/students_hair_eye_color
を入力として取り、/examples/hair_eye_color_processed
を出力として作成する Transform
を定義します:
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform_df, Input, Output @transform_df( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pyspark.sql.DataFrame) -> pyspark.sql.DataFrame # 髪の色がブラウンのデータだけをフィルタリングする return hair_eye_color.filter(hair_eye_color.hair == 'Brown')
filter_hair_color
計算関数では、“hair_eye_color”という入力名がパラメーター名として使用されています。さらに、Pythonでは位置引数がキーワード引数より前に来る必要があるため、Output
引数は Input
引数より前に表示されます。
transform_pandas
デコレータは、メモリに収まるデータセットでのみ使用する必要があります。それ以外の場合は、transform_df
デコレータを使用してデータ変換を書き、入力データセットをtoPandas
メソッドを使用してPandas DataFramesに変換する前にフィルター処理する必要があります。
meta.yaml
に PyArrow を依存関係として追加して toPandas
メソッドを使用することを推奨します。これにより、Arrow と Pandas DataFrame の変換の最適化が可能になります。
データ変換が Pandas ライブラリだけに依存する場合、transforms.api.transform_pandas()
デコレータを使用できます。Pandas ライブラリを使用するには、meta.yml
ファイルの run 依存関係に pandas
を追加する必要があります。詳細については、meta.yml ファイルに関するセクションを参照してください。
transform_pandas()
デコレータは、transform_df()
デコレータと同様ですが、transform_pandas()
は入力データセットを pandas.DataFrame
オブジェクトに変換し、pandas.DataFrame
の戻り値タイプを受け入れます。
transform_pandas()
デコレータは、transforms.api.Input
仕様の数をキーワード引数として受け入れ、transforms.api.Output
仕様を1つの位置引数として受け入れます。Python の要件に従って、位置引数の Output
は最初に表示され、次にキーワード引数の Input
が表示されます。
transform_pandas()
デコレータを使用して Transform
オブジェクトを作成する簡単な例を一緒に見てみましょう。上記と同じサンプルデータセット /examples/students_hair_eye_color
を使用します。データセットのプレビューは以下のとおりです:
>>> students_input = foundry.input('/examples/students_hair_eye_color') # foundryのinputメソッドを使って、'/examples/students_hair_eye_color'というパスからデータを取得します
>>> students_input.dataframe().sort('id').show(n=3) # 取得したデータをデータフレームに変換し、'id'でソートします。その上位3行を表示します。
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows # 最初の3行だけを表示します
これで、入力として /examples/students_hair_eye_color
を取り、出力として /examples/hair_eye_color_processed
を作成する Transform
を定義できます:
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform_pandas, Input, Output @transform_pandas( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pandas.DataFrame) -> pandas.DataFrame # 茶色の髪の色を持つ学生を抽出する return hair_eye_color[hair_eye_color['hair'] == 'Brown']
「hair_eye_color」の入力名は、filter_hair_color
の計算関数でパラメーター名として使用されています。さらに、Pythonでは位置引数がキーワード引数よりも前に来る必要があるため、Output
引数はInput
引数よりも前に表示されます。
この例では、pyspark.sql.DataFrame
オブジェクトではなく、受け取りと返却がpandas.DataFrame
の計算関数からTransform
を作成します。これは上記のDataFrame Transformデコレータセクションの例とは異なります。PandasのデータフレームをPySparkのデータフレームに変換するには、createDataFrame()
メソッドを使用できます。このメソッドは、ユーザーのTransformコンテキストのspark_session
属性で呼び出します。
入力データセット以外の事に依存するデータ変換が必要な場合もあります。たとえば、現在のSparkセッションにアクセスしたり、外部サービスに連絡したりする変換が必要になるかもしれません。そのような場合は、transforms.api.TransformContext
オブジェクトを変換に注入できます。
TransformContext
オブジェクトを注入するには、計算関数がctx
という名前のパラメーターを受け入れる必要があります。これは、入力または出力がctx
という名前になることはできないことも意味します。例えば、Pythonのデータ構造からDataFrame
を作成する場合など、TransformContext
オブジェクトを使用できます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform_df, Output @transform_df( Output('/examples/context') ) def generate_dataframe(ctx): # type: (TransformContext) -> pyspark.sql.DataFrame # ctx.spark_session.createDataFrame を使用して、新しいDataFrameを生成します。 # このDataFrameには、['a', 1], ['b', 2], ['c', 3] というデータが含まれており、スキーマは ['letter', 'number'] となります。 return ctx.spark_session.createDataFrame([ ['a', 1], ['b', 2], ['c', 3] ], schema=['letter', 'number'])
TLLVが正しく機能するためには、コード内ですべてのインポートをモジュールレベルで宣言し、他のモジュール内のオブジェクトをパッチや変更を試みないようにする必要があります。プロジェクトでこれが満たされていない場合、TLLVを無効にする必要があります。詳細については、以下のコード例を参照してください。
TLLVはデフォルトで有効になっています。 TLLVを無効にするには、transformsPythonの設定でtllvをfalseに設定します。この設定は、Transforms Pythonサブプロジェクト内の build.gradle
ファイルにあります。
transformsPython {
tllv false # tllvをfalseに設定します。これは、Python変換を無効にします。
}
トランスフォームのバージョンは、ロジックの陳腐化を検討する際に、2つのトランスフォームのバージョンを比較するために使用される文字列です。トランスフォームの入力が変更されず、トランスフォームのバージョンが変更されない場合、トランスフォームの出力は最新の状態となります。バージョンが変更されると、トランスフォームの出力は無効になり、再計算されます。
デフォルトでは、トランスフォームのバージョンには以下が含まれます。
これらのいずれかが変更されると、バージョン文字列も変更されます。 上記の部分でカバーされていないファイルで変更が発生した場合に出力を無効にしたい場合は、transformsPython設定のtllvFilesを設定してください。例えば、ファイルの設定を読み込んでいて、設定が変更されたときに出力を無効にしたい場合などが考えられます。
transformsPython {
# tllvFiles は、プロジェクトディレクトリに対して相対的なパスで含めたいファイルのパスを表します。
tllvFiles = [
'path/to/file/you/want/to/include/relative/to/project/directory'
]
}
プロジェクトの依存関係のバージョンが変更されたときに出力の無効化を回避したい場合は、tllvIncludeDeps を false に設定してください。
transformsPython {
tllvIncludeDeps false // tllvIncludeDepsをfalseに設定します。これは、依存関係を含むかどうかを制御します。
}
次に、有効なインポートと無効なインポートのコード例を考えてみましょう:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
# モジュールのトップでのみインポートしている場合、心配する必要はありません。 from transforms.api import transform_df, Input, Output from myproject.datasets import utils from myproject.testing import test_mock as tmock import importlib # モジュールスコープで `__import__` または `importlib` を使用するのは問題ありません。 logging = __import__('logging') my_compute = importlib.import_module('myproject.compute') def helper(x): # これは無効です。関数やクラスメソッドでインポートする場合は、TLLV を無効にする必要があります。 # すべてのインポートはモジュールスコープにする必要があります。 import myproject.helpers as myhelp return myhelp.round(x) # トランスフォーム関数定義 @transform_df( Output("/path/to/output/dataset"), my_input=Input("/path/to/input/dataset"), ) def my_compute_function(my_input): # これは無効です。関数内でインポートを使用したい場合は、TLLV を無効にする必要があります! ihelper = __import__('myproject.init_helper') my_functions = importlib.import_module('myproject.functions') return my_input
TLLV を使用している場合は、拡張モジュール を使用して TLLV を無効にする必要があります。
リポジトリ内の各 Pythonトランスフォーム サブプロジェクトは、1つの transforms.api.Pipeline
オブジェクトを公開します。この Pipeline
オブジェクトは、以下の目的で使用されます。
transforms.api.Transform
オブジェクトを見つけて実行するPython トランスフォーメーションを実行するためのランタイムは、プロジェクトの Pipeline
を見つけることができる必要があります。Pipeline
をエクスポートするには、Pythonトランスフォームサブプロジェクト内の setup.py
ファイルの entry_points
引数に追加します。エントリーポイントに関する詳細は、setuptools ライブラリのドキュメント を参照してください。
デフォルトでは、各 Python サブプロジェクトは root
という名前の transforms.pipelines
エントリーポイントをエクスポートする必要があります。エントリーポイントは、Python サブプロジェクトのルートディレクトリにある setup.py
ファイルで定義されていることを思い出してください。エントリーポイントは、モジュール名と Pipeline
属性を参照します。
例えば、myproject/pipeline.py
で定義された “my_pipeline” という名前の Pipeline
があるとします。
Copied!1 2 3 4 5
# transforms.api モジュールから Pipeline をインポートします from transforms.api import Pipeline # Pipeline インスタンスを作成します my_pipeline = Pipeline()
次のようにして setup.py
でこの Pipeline
を登録できます:
Copied!1 2 3 4 5 6 7 8 9 10
import os from setuptools import find_packages, setup setup( entry_points={ # エントリーポイントを設定 'transforms.pipelines': [ # トランスフォーム・パイプラインを設定 'root = myproject.pipeline:my_pipeline' # ルートパイプラインをmyproject.pipelineから読み込む ] } )
上記のコードでは、root
はエクスポートするPipeline
の名前を指し、myproject.pipeline
はPipeline
を含むモジュールを指し、my_pipeline
はそのモジュールで定義されたPipeline
属性を指します。
プロジェクトのPipelineに関連付けられたTransform
オブジェクトがデータセットをOutput
として宣言すると、そのデータセットをFoundryで構築できます。Transform
オブジェクトをPipeline
に追加する推奨方法は手動登録と自動登録の2つです。
より高度なワークフローを持っていたり、各Transform
オブジェクトをプロジェクトのPipelineに明示的に追加したい場合は、手動登録を使用できます。それ以外の場合は、登録コードが簡潔で包含的であることを確認するために、自動登録を強く推奨します。自動登録では、discover_transforms
メソッドがモジュールレベルで定義されたすべてのTransform
オブジェクトを再帰的に発見します。詳細は以下のセクションを参照してください。
discover_transforms
メソッドは見つけたすべてのモジュールをインポートします。その結果、インポートされたモジュール内の任意のコードが実行されます。
プロジェクトの複雑性が増すにつれて、Transform
オブジェクトを手動でPipeline
に追加することは煩雑になる可能性があります。そのため、Pipeline
オブジェクトはdiscover_transforms()
メソッドを提供して、Pythonモジュールまたはパッケージ内のすべてのTransform
オブジェクトを再帰的に発見します。
Copied!1 2 3 4 5 6 7 8 9 10 11
# transforms.api モジュールから Pipeline をインポートします from transforms.api import Pipeline # 自身が定義した my_module をインポートします import my_module # Pipeline インスタンスを作成します my_pipeline = Pipeline() # my_module 内のすべての変換(transforms)を Pipeline に追加します my_pipeline.discover_transforms(my_module)
Transform
オブジェクトは、add_transforms()
関数を使用して、Pipeline
に手動で追加できます。この関数は、任意の数の Transform
オブジェクトを受け取り、Pipeline に追加します。また、同じ出力データセットを宣言する2つの Transform
オブジェクトがないことを確認します。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# transforms.apiから必要な関数やクラスをインポートします from transforms.api import transform_df, Pipeline, Input, Output # transform_dfデコレータを使用して、データ変換の関数を定義します @transform_df( # Outputは変換後のデータセットの保存先を指定します Output('/path/to/output/dataset'), # Inputは変換前のデータセットの場所を指定します my_input=Input('/path/to/input/dataset') ) def my_compute_function(my_input): # この例では、入力データをそのまま出力として返しています return my_input # Pipelineクラスのインスタンスを作成します my_pipeline = Pipeline() # add_transformsメソッドを使用して、定義したデータ変換関数をパイプラインに追加します my_pipeline.add_transforms(my_compute_function)
複数の出力を生成するデータ変換を定義したい場合は、トランスフォーム生成を使用するか、複数の出力を持つトランスフォームを定義する ことができます。トランスフォーム生成では、すべての出力のために同じ入力を一度に読み込んで処理する必要がある場合があります。複数の出力を持つトランスフォームでは、入力を一度だけ読み込んで処理することが可能です。
ユーザーは、複数のトランスフォームオブジェクトで同じデータ変換ロジックを再利用したい場合があります。例えば、以下のシナリオを考えてみましょう。
どちらのケースでも、複数のトランスフォームで同じデータ変換コードを使用すると便利です。ユーザーの出力ごとに別々のトランスフォームオブジェクトを定義する代わりに、forループを使用してトランスフォームオブジェクトを生成し、それらを一括でプロジェクトのパイプラインに登録することができます。ここにトランスフォームを生成する例を示します。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from transforms.api import transform_df, Input, Output def transform_generator(sources): # type: (List[str]) -> List[transforms.api.Transform] transforms = [] # この例では複数の入力データセットを使用しています。単一の入力データセットからも複数の出力を生成することが可能です。 for source in sources: @transform_df( Output('/sources/{source}/output'.format(source=source)), my_input=Input('/sources/{source}/input'.format(source=source)) ) def compute_function(my_input, source=source): # 関数内でsource変数をキャプチャするために、デフォルトのキーワード引数として渡します。 return my_input.filter(my_input.source == source) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
ソース変数を関数に取り込むには、デフォルトのキーワード引数 source
で関数に渡す必要があります。
Transformsを生成するためのループを使用する場合、Pythonのforループが新しいスコープを作成しないため、Transformオブジェクトを生成するループは関数内になければなりません。関数を使用しない場合、自動登録が誤ってforループで定義された最後のTransformオブジェクトのみを検出します。この関数は、生成されたTransformsオブジェクトのリストを返し、戻り値は変数に設定されるべきです。これらの条件を、自動登録で検出されるように設定されたモジュール内で満たすことで、生成されたTransformsで自動登録を使用することができます。または、手動登録を使用することができます。
入力データセットのリストがビルド間で変更される場合(例えば、入力データセットのリストがビルド間で変更されるファイルから読み込まれる場合)、新しいデータセットの参照がビルドのジョブスペックに見つからないため、ビルドが失敗します。これが問題になる可能性がある場合は、代わりに Logic Flow を使用してください。
手動登録を使用する場合、生成されたTransformsをパイプラインに追加できます。*
構文に慣れていない場合は、このチュートリアルを参照してください。
Copied!1 2 3 4 5 6 7 8
# my_moduleをインポートします import my_module # Pipelineインスタンスを生成します my_pipeline = Pipeline() # my_moduleからTRANSFORMSを追加します my_pipeline.add_transforms(*my_module.TRANSFORMS)
手動での登録では、コードリポジトリの「ビルド」ボタンが機能しない場合があり、リクエストされたファイルからパイプラインに変換が見つからないというエラーが表示されることに注意してください。これらのデータセットは、Data LineageまたはDataset Previewアプリケーションでビルドすることができます。