注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Transforms Python APIは、Pipeline
を構築するためのクラスとデコレータを提供します。このページには利用可能な関数に関する情報が含まれています。また、クラス についてさらに読むこともできます。
関数 | 説明 |
---|---|
configure ([profile]) | トランスフォームの設定を変更するためのデコレータ。 |
incremental ([require_incremental, ...]) | 入力と出力をtransforms.api.incrementalの対応物に変換するデコレータ。 |
lightweight ([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command]) | デコレータで修飾されたトランスフォームをコンテナトランスフォームで実行するためのデコレータ。 |
transform (ios) | 計算関数を Transform オブジェクトとしてラップする。 |
transform_df (output, inputs) | ラップされた計算関数をデータフレームトランスフォームとして登録する。 |
transform_pandas (output, inputs) | ラップされた計算関数をpandasトランスフォームとして登録する。 |
transform_polars (output, inputs) | ラップされた計算関数をPolarsトランスフォームとして登録する。 |
configure
transforms.api.configure
(profile=None, allowed_run_duration=None, run_as_user=False)configure
デコレータは Transform
をラップするために使用されなければなりません。Copied!1 2 3 4 5 6 7 8 9 10 11
# `configure` デコレータは、実行時の設定を指定するために使用します。 # `profile` パラメータには、リソースのプロファイルを指定します。 # ここでは 'EXECUTOR_MEMORY_MEDIUM' が指定されており、これは実行者のメモリが中程度であることを意味します。 >>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) # `transform` デコレータは、データの変換ロジックを指定するために使用します。 ... @transform(...) # `my_compute_function` という名前の関数を定義します。 ... def my_compute_function(...): ... pass
TypeError
↗
Transform
オブジェクトでない場合。incremental
transforms.api.incremental
(require_incremental=False, semantic_version=1, snapshot_inputs=None)transforms.api.incremental
に変換するためのデコレーター。incremental
デコレーターは Transform
をラップするために使用する必要があります。Copied!1 2 3 4 5 6
# @incremental() は関数の処理を逐次的に行うためのデコレーターです # @transform(...) は関数の出力を変換するためのデコレーターです >>> @incremental() ... @transform(...) ... def my_compute_function(...): # my_compute_function という名前の関数を定義しています ... pass # ここに関数の具体的な処理を書く
デコレーターは、出力データセットからビルド履歴を読み取り、最後のビルド時点での入力の状態を特定します。この情報を使用して、TransformInput
、TransformOutput
、およびTransformContext
オブジェクトを、それぞれのインクリメンタルな対応物であるIncrementalTransformInput
、IncrementalTransformOutput
、およびIncrementalTransformContext
に変換します。
このデコレーターは、transform_df()
およびtransform_pandas()
デコレーターをラップするためにも使用できます。これらのデコレーターは、引数なしで入力に対してdataframe()
およびpandas()
を呼び出し、PySparkおよびpandasのDataFrameオブジェクトを抽出します。したがって、使用される読み取りモードは常にadded
となり、書き込みモードはincremental
デコレーターによって決定されます。非デフォルトモードのいずれかを読み書きするには、transform()
デコレーターを使用する必要があります。
PySparkまたはpandasトランスフォームの追加出力行が追加入力行の関数のみである場合(APPEND
の例を参照)、デフォルトモードは正しいインクリメンタルトランスフォームを生成します。
ユーザーのトランスフォームがSNAPSHOT
トランザクションを含むデータセットを入力として取るが、それがトランスフォームのインクリメンタル実行能力に影響を与えない場合(リファレンステーブルなど)、トランスフォームを完全なSNAPSHOT
として実行しないようにするためにsnapshot_inputs
引数を確認してください。
ユーザーのトランスフォームが複雑なロジック(結合、集計、distinctなど)を実行する場合、このデコレーターを使用する前にインクリメンタルドキュメントを読むことをお勧めします。
True
の場合、トランスフォームはインクリメンタルでない実行を拒否します。ただし、トランスフォームがこれまでに実行されたことがない場合は例外です。これは、すべての出力データセットにコミットされたトランザクションがないことに基づいて判断されます。1
です。この数値はトランスフォームのセマンティックな性質を表します。既存の出力を無効にするようなトランスフォームのロジックが変更された場合、この数値を変更する必要があります。この数値を変更すると、次回のトランスフォーム実行はインクリメンタルでない実行となります。SNAPSHOT
トランザクションがトランスフォームの現在の出力を無効にしない入力。たとえば、参照テーブルの更新は、以前に計算された出力が誤っていることを意味しません。これ以外のすべての入力に新しいデータが追加されているか、または新しいデータがない場合に、トランスフォームはインクリメンタルに実行されます。snapshot_inputsを読み取るとき、IncrementalTransformInput
は入力データセットの現在のビューのみを公開します。True
の場合、foundry-retention
による削除はインクリメンタル性を損なうことはありません。strict_append
パラメーターがTrue
に設定されており、入力データセットがインクリメンタルである場合、基盤となるFoundryのトランザクションタイプはAPPEND
に設定され、インクリメンタル書き込みにはAPPEND
トランザクションが使用されます。既存のファイルを上書きしようとすると例外が発生します。
入力データセットがインクリメンタルでない場合、strict_append
はSNAPSHOT
として実行されます。コードがAPPEND
としてインクリメンタルに実行されることを保証するためにrequire_incremental=True
を使用する必要があります。既存のファイルを上書きしようとすると成功します。
書き込み操作は、ParquetサマリーメタデータやHadoopのSUCCESS
ファイルなどの補助ファイルを含め、ファイルを上書きしないことがあります。すべてのFoundry形式のインクリメンタル書き込みは、strict_append
モードをサポートする必要があります。False
です。True
に設定すると、v2インクリメンタルセマンティクスを使用します。v2とv1のインクリメンタルセマンティクスの間に動作の違いはなく、すべてのユーザーがこれをTrue
に設定することをお勧めします。非カタログのインクリメンタル入力と出力は、v2セマンティクスを使用する場合にのみサポートされます。TypeError
↗
Transform object
でない場合。KeyError
↗
Transform object
に存在しない場合。lightweight
transforms.api.lightweight
(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)lightweight
デコレーターは、Transform
をラップするために使用する必要があります。Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
>>> @lightweight ... @transform( ... my_input=Input('/input'), # '/input'のパスからデータを読み込む ... my_output=Output('/output') # '/output'のパスへデータを書き出す ... ) ... def compute_func(my_input, my_output): ... my_output.write_pandas(my_input.pandas()) # pandas形式で出力を書き出す >>> @lightweight() ... @transform( ... my_input=Input('/input'), # '/input'のパスからデータを読み込む ... my_output=Output('/output') # '/output'のパスへデータを書き出す ... ) ... def compute_func(my_input, my_output): ... for file in my_input.filesystem().ls(): # '/input'の中の全ファイルを読み込む ... with my_input.filesystem().open(file.path) as f1: # ファイルを開く ... with my_output.filesystem().open(file.path, "w") as f2: # 書き出し用のファイルを開く ... f2.write(f1.read()) # ファイルを読み込み、出力へ書き出す >>> @lightweight(memory_gb=3.5) # メモリの使用量を3.5GBに制限 ... @transform_pandas( ... Output('/output'), # '/output'のパスへデータを書き出す ... my_input=Input('/input') # '/input'のパスからデータを読み込む ... ) ... def compute_func(my_input): ... return my_input # 入力をそのまま出力する >>> @lightweight(container_image='my-image', container_tag='0.0.1') # コンテナのイメージとタグを指定 ... @transform(my_output=Output('ri...my_output')) # 'ri...my_output'のパスへデータを書き出す ... def run_data_generator_executable(my_output): ... os.system('$USER_WORKING_DIR/data_generator') # データジェネレータの実行 ... my_output.write_table(pd.read_csv('data.csv')) # 'data.csv'を読み込み、テーブル形式で出力する
軽量トランスフォームは、単一ノードでSparkを使用せずに実行されるトランスフォームです。軽量トランスフォームは、小規模から中規模のデータセットに対して高速かつコスト効果の高い方法です。ただし、軽量トランスフォームは、通常のトランスフォームのAPIのサブセット(pandasやファイルシステムAPIなど)のみをサポートする一方で、データセットにアクセスするためのさらに多くのメソッドを提供します。軽量トランスフォームの詳細については、documentationをご覧ください。
このデコレーターを使用するには、foundry-transforms-lib-python
を依存関係として追加する必要があります。
container_image
、container_tag
、またはcontainer_shell_command
のいずれかが設定されている場合、container_image
とcontainer_tag
の両方が設定されている必要があります。container_shell_command
が設定されていない場合、デフォルトのエントリーポイントが使用され、Python環境をブートストラップし、トランスフォームで指定されたユーザーコードを実行します。
container_*
引数を指定することは、BYOC(Bring-Your-Own-Container)ワークフローと呼ばれます。これにより、ユーザーのコードリポジトリからすべてのファイルが実行時に$USER_WORKING_DIR/user_code
内で利用可能になり、Python環境も利用可能になります。
container_image
とcontainer_tag
で指定されたイメージは、コードリポジトリの製作物を利用したリポジトリから利用可能である必要があります。詳細については、BYOC documentationをご覧ください。
2
です。memory_gb
またはmemory_mb
のいずれかを指定できますが、両方を同時に指定することはできません。16GB
です。transform
transforms.api.transform
(ios)計算関数をTransform
オブジェクトとしてラップします。
transform
デコレーターは、計算関数からTransform
オブジェクトを構築するために使用されます。入力および出力に使用される名前は、ラップされた計算関数のパラメーター名である必要があります。計算時に、関数にはTransformInput
およびTransformOutput
オブジェクトとして入力および出力が渡されます。
Copied!1 2 3 4 5 6 7 8 9 10 11
>>> @transform( ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセットへのパス ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセットへのパス ... first_output=Output('/path/to/first/output/dataset'), # 最初の出力データセットへのパス ... second_output=Output('/path/to/second/output/dataset'), # 二番目の出力データセットへのパス ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... # 関数の型ヒント:入力はTransformInput、出力はTransformOutput、戻り値はなし ... first_output.write_dataframe(first_input.data_dataframe()) # 最初の入力データセットからデータフレームを取得し、最初の出力データセットに書き込む ... second_output.write_dataframe(second_input.data_dataframe()) # 二番目の入力データセットからデータフレームを取得し、二番目の出力データセットに書き込む
compute 関数は、データを出力先に書き込む役割を担っています。
必要に応じて、TransformContext および対応する SparkSession にも compute 関数内でアクセスできます。これを使用して、空のデータフレームを作成したり、Spark の設定を適用したりすることができます。可能な限り、SparkSession
オブジェクトを使用して Spark の設定値を設定するのではなく、既存のデフォルト Spark プロファイルを使用することをお勧めします。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
>>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成します。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... output.write_dataframe(empty_df)
transform_df
transforms.api.transform_df
(output, inputs)transform_df
デコレーターは、pyspark.sql.DataFrame
↗ オブジェクトを受け取り、返す計算関数から Transform
オブジェクトを構築するために使用されます。 transform()
デコレーターと同様に、入力名は計算関数のパラメーター名になります。ただし、transform_df
は位置引数として単一の Output
スペックのみを受け入れます。計算関数の戻り値も DataFrame
↗ であり、単一の出力データセットに自動的に書き出されます。Copied!1 2 3 4 5 6 7 8 9
>>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセット ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセット ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... # 最初のデータフレームと二番目のデータフレームを結合します。 ... return first_input.union(second_input)
オプションとして、TransformContext と対応する SparkSession も計算関数内でアクセスできます。これを使用して空のデータフレームを作成したり、Spark の設定を適用したりなどができます。可能な限り、SparkSession オブジェクト経由で Spark config 値を設定する代わりに、既存のデフォルト Spark プロファイルを使用することをお勧めします。
Copied!1 2 3 4 5 6 7 8 9 10 11
>>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成しています。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))
transform_pandas
transforms.api.transform_pandas
(output, inputs)pandas ライブラリを使用するには、meta.yml
ファイルに pandas
を run 依存関係として追加する必要があります。
transform_pandas
デコレータは、pandas.DataFrame
↗ オブジェクトを受け取り返す計算関数から Transform
オブジェクトを構築するために使用されます。このデコレータは transform_df()
デコレータと似ていますが、計算の前に pyspark.sql.DataFrame
↗ オブジェクトが pandas.DataFrame
↗ オブジェクトに変換され、計算後に再び変換されます。
Copied!1 2 3 4 5 6 7 8 9
>>> @transform_pandas( ... Output('/path/to/output/dataset'), # 名前がないOutputの仕様 ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセット ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセット ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... # 最初の入力データフレームと二番目の入力データフレームを連結する ... return first_input.concat(second_input)
transform_pandas
はメモリに収まるデータセットでのみ使用する必要があることに注意してください。より大きなデータセットを pandas に変換する前にフィルター処理したい場合は、transform_df()
デコレーターと pyspark.sql.SparkSession.createDataFrame()
↗ メソッドを使用してトランスフォームを記述する必要があります。
Copied!1 2 3 4 5 6 7 8 9 10
>>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() ... # データの一部に対してpandasの操作を行った後、PySpark DataFrameに戻す ... return ctx.spark_session.createDataFrame(pd)
ここで、@transform_df
デコレーターは、一連の入力と出力のパスを関数に割り当てます。これらのパスは、データセットの読み込みと書き込みに使用されます。この関数 my_compute_function
は、2つの PySpark DataFrame を引数として受け取り、1つの DataFrame を返します。
first_input.filter(first_input.county == 'UK').toPandas()
は、"UK"という値を持つ"county"カラムを持つ行だけをフィルタリングし、その結果をPandas DataFrameに変換します。
最後に、Pandas DataFrameをPySpark DataFrameに戻す操作を行っています。この操作は、ctx.spark_session.createDataFrame(pd)
によって実行されます。ここで ctx
は、現在のSparkセッションにアクセスできるコンテキストオブジェクトです。
transform_polars
transforms.api.transform_polars
(output, inputs)このデコレーターを使用するには、meta.yml
ファイルに foundry-transforms-lib-python
と polars
を run 依存関係として追加する必要があります。
transform_polars
デコレーターは、polars.DataFrame
↗ オブジェクトを受け取り、返す計算関数から Transform
オブジェクトを構築するために使用されます。このデコレーターは transform_df()
デコレーターに似ていますが、ユーザーコードには polars.DataFrame
↗ オブジェクトが渡されます。
transform_polars
デコレーターは @lightweight
デコレーターの薄いラッパーに過ぎません。これを使用すると、通常のトランスフォームのいくつかの機能を欠いた軽量トランスフォームが作成されます。軽量トランスフォームの詳細については、こちらのドキュメントをご覧ください。
Spark プロファイルやその他の一部のトランスフォーム機能は @lightweight
トランスフォームでは使用できないため、@transforms_polars
でも使用できません。
Copied!1 2 3 4 5 6 7 8 9
>>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # 名前のないOutput仕様 ... first_input=Input('ri.main.foundry.dataset.in1'), # 最初の入力データ ... second_input=Input('ri.main.foundry.dataset.in2'), # 二番目の入力データ ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... # 'id' を基準にして最初のデータフレームと二番目のデータフレームを内部結合します ... return first_input.join(second_input, on='id', how="inner")