注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Transforms Python API

Transforms Python APIは、Pipeline を構築するためのクラスとデコレータを提供します。このページには利用可能な関数に関する情報が含まれています。また、クラス についてさらに読むこともできます。

関数説明
configure([profile])トランスフォームの設定を変更するためのデコレータ。
incremental([require_incremental, ...])入力と出力をtransforms.api.incrementalの対応物に変換するデコレータ。
lightweight([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command])デコレータで修飾されたトランスフォームをコンテナトランスフォームで実行するためのデコレータ。
transform(ios)計算関数を Transform オブジェクトとしてラップする。
transform_df(output, inputs)ラップされた計算関数をデータフレームトランスフォームとして登録する。
transform_pandas(output, inputs)ラップされた計算関数をpandasトランスフォームとして登録する。
transform_polars(output, inputs)ラップされた計算関数をPolarsトランスフォームとして登録する。

configure

transforms.api.configure(profile=None, allowed_run_duration=None, run_as_user=False)

  • トランスフォームの設定を変更するためのデコレータ。
  • configure デコレータは Transform をラップするために使用されなければなりません。
Copied!
1 2 3 4 5 6 7 8 9 10 11 # `configure` デコレータは、実行時の設定を指定するために使用します。 # `profile` パラメータには、リソースのプロファイルを指定します。 # ここでは 'EXECUTOR_MEMORY_MEDIUM' が指定されており、これは実行者のメモリが中程度であることを意味します。 >>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) # `transform` デコレータは、データの変換ロジックを指定するために使用します。 ... @transform(...) # `my_compute_function` という名前の関数を定義します。 ... def my_compute_function(...): ... pass

パラメーター

  • profile (str ↗ または List[str ↗], 任意)
    • 使用するトランスフォームプロファイルの名前。
  • allowed_run_duration (timedelta ↗, 任意)
    • このジョブが失敗する前にかかる時間の上限。慎重に使用してください。許可される持続時間を設定する際には、データの規模や形状の変化などの変数を考慮してください。持続時間は分単位でのみ指定可能です。重要: インクリメンタルトランスフォームに使用する場合は注意が必要です。スナップショットを実行する際に持続時間が大きく変わることがあります。
  • run_as_user (Boolean, 任意)
    • トランスフォームがユーザーの権限で実行されるかどうかを決定します。これを有効にすると、ジョブを実行するユーザーの権限によってジョブの動作が異なる場合があります。

発生する可能性のあるエラー


incremental

transforms.api.incremental(require_incremental=False, semantic_version=1, snapshot_inputs=None)

  • 入出力を transforms.api.incremental に変換するためのデコレーター。
  • incremental デコレーターは Transform をラップするために使用する必要があります。
Copied!
1 2 3 4 5 6 # @incremental() は関数の処理を逐次的に行うためのデコレーターです # @transform(...) は関数の出力を変換するためのデコレーターです >>> @incremental() ... @transform(...) ... def my_compute_function(...): # my_compute_function という名前の関数を定義しています ... pass # ここに関数の具体的な処理を書く

デコレーターは、出力データセットからビルド履歴を読み取り、最後のビルド時点での入力の状態を特定します。この情報を使用して、TransformInputTransformOutput、およびTransformContextオブジェクトを、それぞれのインクリメンタルな対応物であるIncrementalTransformInputIncrementalTransformOutput、およびIncrementalTransformContextに変換します。

このデコレーターは、transform_df()およびtransform_pandas()デコレーターをラップするためにも使用できます。これらのデコレーターは、引数なしで入力に対してdataframe()およびpandas()を呼び出し、PySparkおよびpandasのDataFrameオブジェクトを抽出します。したがって、使用される読み取りモードは常にaddedとなり、書き込みモードはincrementalデコレーターによって決定されます。非デフォルトモードのいずれかを読み書きするには、transform()デコレーターを使用する必要があります。

PySparkまたはpandasトランスフォームの追加出力行が追加入力行の関数のみである場合(APPENDの例を参照)、デフォルトモードは正しいインクリメンタルトランスフォームを生成します。

ユーザーのトランスフォームがSNAPSHOTトランザクションを含むデータセットを入力として取るが、それがトランスフォームのインクリメンタル実行能力に影響を与えない場合(リファレンステーブルなど)、トランスフォームを完全なSNAPSHOTとして実行しないようにするためにsnapshot_inputs引数を確認してください。

ユーザーのトランスフォームが複雑なロジック(結合、集計、distinctなど)を実行する場合、このデコレーターを使用する前にインクリメンタルドキュメントを読むことをお勧めします。

パラメーター

  • require_incremental (Boolean ↗, オプション)
    • Trueの場合、トランスフォームはインクリメンタルでない実行を拒否します。ただし、トランスフォームがこれまでに実行されたことがない場合は例外です。これは、すべての出力データセットにコミットされたトランザクションがないことに基づいて判断されます。
  • semantic_version (int ↗, オプション)
    • デフォルトは1です。この数値はトランスフォームのセマンティックな性質を表します。既存の出力を無効にするようなトランスフォームのロジックが変更された場合、この数値を変更する必要があります。この数値を変更すると、次回のトランスフォーム実行はインクリメンタルでない実行となります。
  • snapshot_inputs (文字列のリスト, オプション)
    • SNAPSHOTトランザクションがトランスフォームの現在の出力を無効にしない入力。たとえば、参照テーブルの更新は、以前に計算された出力が誤っていることを意味しません。これ以外のすべての入力に新しいデータが追加されているか、または新しいデータがない場合に、トランスフォームはインクリメンタルに実行されます。snapshot_inputsを読み取るとき、IncrementalTransformInputは入力データセットの現在のビューのみを公開します。
  • allow_retention (Boolean, オプション)
    • Trueの場合、foundry-retentionによる削除はインクリメンタル性を損なうことはありません。
  • strict_append (Boolean, オプション)
    • strict_appendパラメーターがTrueに設定されており、入力データセットがインクリメンタルである場合、基盤となるFoundryのトランザクションタイプはAPPENDに設定され、インクリメンタル書き込みにはAPPENDトランザクションが使用されます。既存のファイルを上書きしようとすると例外が発生します。 入力データセットがインクリメンタルでない場合、strict_appendSNAPSHOTとして実行されます。コードがAPPENDとしてインクリメンタルに実行されることを保証するためにrequire_incremental=Trueを使用する必要があります。既存のファイルを上書きしようとすると成功します。 書き込み操作は、ParquetサマリーメタデータやHadoopのSUCCESSファイルなどの補助ファイルを含め、ファイルを上書きしないことがあります。すべてのFoundry形式のインクリメンタル書き込みは、strict_appendモードをサポートする必要があります。
  • v2_semantics (Boolean, オプション)
    • デフォルトはFalseです。Trueに設定すると、v2インクリメンタルセマンティクスを使用します。v2とv1のインクリメンタルセマンティクスの間に動作の違いはなく、すべてのユーザーがこれをTrueに設定することをお勧めします。非カタログのインクリメンタル入力と出力は、v2セマンティクスを使用する場合にのみサポートされます。

例外


lightweight

transforms.api.lightweight(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)

  • デコレーターは、デコレートされたトランスフォームをコンテナトランスフォームインフラストラクチャ上で実行するようにします。
  • lightweightデコレーターは、Transformをラップするために使用する必要があります。
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 >>> @lightweight ... @transform( ... my_input=Input('/input'), # '/input'のパスからデータを読み込む ... my_output=Output('/output') # '/output'のパスへデータを書き出す ... ) ... def compute_func(my_input, my_output): ... my_output.write_pandas(my_input.pandas()) # pandas形式で出力を書き出す >>> @lightweight() ... @transform( ... my_input=Input('/input'), # '/input'のパスからデータを読み込む ... my_output=Output('/output') # '/output'のパスへデータを書き出す ... ) ... def compute_func(my_input, my_output): ... for file in my_input.filesystem().ls(): # '/input'の中の全ファイルを読み込む ... with my_input.filesystem().open(file.path) as f1: # ファイルを開く ... with my_output.filesystem().open(file.path, "w") as f2: # 書き出し用のファイルを開く ... f2.write(f1.read()) # ファイルを読み込み、出力へ書き出す >>> @lightweight(memory_gb=3.5) # メモリの使用量を3.5GBに制限 ... @transform_pandas( ... Output('/output'), # '/output'のパスへデータを書き出す ... my_input=Input('/input') # '/input'のパスからデータを読み込む ... ) ... def compute_func(my_input): ... return my_input # 入力をそのまま出力する >>> @lightweight(container_image='my-image', container_tag='0.0.1') # コンテナのイメージとタグを指定 ... @transform(my_output=Output('ri...my_output')) # 'ri...my_output'のパスへデータを書き出す ... def run_data_generator_executable(my_output): ... os.system('$USER_WORKING_DIR/data_generator') # データジェネレータの実行 ... my_output.write_table(pd.read_csv('data.csv')) # 'data.csv'を読み込み、テーブル形式で出力する

軽量トランスフォームは、単一ノードでSparkを使用せずに実行されるトランスフォームです。軽量トランスフォームは、小規模から中規模のデータセットに対して高速かつコスト効果の高い方法です。ただし、軽量トランスフォームは、通常のトランスフォームのAPIのサブセット(pandasやファイルシステムAPIなど)のみをサポートする一方で、データセットにアクセスするためのさらに多くのメソッドを提供します。軽量トランスフォームの詳細については、documentationをご覧ください。

このデコレーターを使用するには、foundry-transforms-lib-pythonを依存関係として追加する必要があります。

container_imagecontainer_tag、またはcontainer_shell_commandのいずれかが設定されている場合、container_imagecontainer_tagの両方が設定されている必要があります。container_shell_commandが設定されていない場合、デフォルトのエントリーポイントが使用され、Python環境をブートストラップし、トランスフォームで指定されたユーザーコードを実行します。

container_*引数を指定することは、BYOC(Bring-Your-Own-Container)ワークフローと呼ばれます。これにより、ユーザーのコードリポジトリからすべてのファイルが実行時に$USER_WORKING_DIR/user_code内で利用可能になり、Python環境も利用可能になります。

container_imagecontainer_tagで指定されたイメージは、コードリポジトリの製作物を利用したリポジトリから利用可能である必要があります。詳細については、BYOC documentationをご覧ください。

パラメーター

  • cpu_cores (float, optional)
    • トランスフォームに割り当てるCPUコアの数。小数にすることも可能です。デフォルトは2です。
  • memory_mb (float, optional)
    • コンテナに割り当てるメモリの量(MB)。memory_gbまたはmemory_mbのいずれかを指定できますが、両方を同時に指定することはできません。
  • memory_gb (float, optional)
    • コンテナに割り当てるメモリの量(GB)。デフォルトは16GBです。
  • gpu_type (str, optional_)
    • トランスフォームに割り当てるGPUのタイプ。
  • container_image (str, optional_)
    • トランスフォームのコンテナに使用するイメージ。
  • container_tag (str, optional_)
    • トランスフォームのコンテナに使用するイメージタグ。
  • container_shell_command (str, optional_)
    • コンテナ内で実行するシェルコマンド。指定されていない場合、コンテナが起動するとユーザーのトランスフォームを実行するデフォルト値が生成されます。

transform

transforms.api.transform(ios)

  • 計算関数をTransformオブジェクトとしてラップします。

  • transformデコレーターは、計算関数からTransformオブジェクトを構築するために使用されます。入力および出力に使用される名前は、ラップされた計算関数のパラメーター名である必要があります。計算時に、関数にはTransformInputおよびTransformOutputオブジェクトとして入力および出力が渡されます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 >>> @transform( ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセットへのパス ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセットへのパス ... first_output=Output('/path/to/first/output/dataset'), # 最初の出力データセットへのパス ... second_output=Output('/path/to/second/output/dataset'), # 二番目の出力データセットへのパス ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... # 関数の型ヒント:入力はTransformInput、出力はTransformOutput、戻り値はなし ... first_output.write_dataframe(first_input.data_dataframe()) # 最初の入力データセットからデータフレームを取得し、最初の出力データセットに書き込む ... second_output.write_dataframe(second_input.data_dataframe()) # 二番目の入力データセットからデータフレームを取得し、二番目の出力データセットに書き込む

パラメーター

  • ios (Input または Output)
    • kwargs (キーワード引数) は、名前付きの Input および Output 仕様で構成されます。

compute 関数は、データを出力先に書き込む役割を担っています。

必要に応じて、TransformContext および対応する SparkSession にも compute 関数内でアクセスできます。これを使用して、空のデータフレームを作成したり、Spark の設定を適用したりすることができます。可能な限り、SparkSession オブジェクトを使用して Spark の設定値を設定するのではなく、既存のデフォルト Spark プロファイルを使用することをお勧めします。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成します。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... output.write_dataframe(empty_df)

transform_df

transforms.api.transform_df(output, inputs)

  • ラップされた計算関数をデータフレーム トランスフォームとして登録します。
  • transform_df デコレーターは、pyspark.sql.DataFrame オブジェクトを受け取り、返す計算関数から Transform オブジェクトを構築するために使用されます。 transform() デコレーターと同様に、入力名は計算関数のパラメーター名になります。ただし、transform_df は位置引数として単一の Output スペックのみを受け入れます。計算関数の戻り値も DataFrame であり、単一の出力データセットに自動的に書き出されます。
Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセット ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセット ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... # 最初のデータフレームと二番目のデータフレームを結合します。 ... return first_input.union(second_input)

パラメーター

  • output (Output)
    • トランスフォームの単一の Output スペック。
  • inputs (Input)
    • 名前付きの Input スペックで構成されるkwargs(キーワード引数)。

オプションとして、TransformContext と対応する SparkSession も計算関数内でアクセスできます。これを使用して空のデータフレームを作成したり、Spark の設定を適用したりなどができます。可能な限り、SparkSession オブジェクト経由で Spark config 値を設定する代わりに、既存のデフォルト Spark プロファイルを使用することをお勧めします。

Copied!
1 2 3 4 5 6 7 8 9 10 11 >>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成しています。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))

transform_pandas

transforms.api.transform_pandas(output, inputs)

  • ラップされた計算関数を pandas トランスフォームとして登録する。

pandas ライブラリを使用するには、meta.yml ファイルに pandasrun 依存関係として追加する必要があります。

transform_pandas デコレータは、pandas.DataFrame オブジェクトを受け取り返す計算関数から Transform オブジェクトを構築するために使用されます。このデコレータは transform_df() デコレータと似ていますが、計算の前に pyspark.sql.DataFrame オブジェクトが pandas.DataFrame オブジェクトに変換され、計算後に再び変換されます。

Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_pandas( ... Output('/path/to/output/dataset'), # 名前がないOutputの仕様 ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセット ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセット ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... # 最初の入力データフレームと二番目の入力データフレームを連結する ... return first_input.concat(second_input)

transform_pandas はメモリに収まるデータセットでのみ使用する必要があることに注意してください。より大きなデータセットを pandas に変換する前にフィルター処理したい場合は、transform_df() デコレーターと pyspark.sql.SparkSession.createDataFrame() メソッドを使用してトランスフォームを記述する必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 >>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() ... # データの一部に対してpandasの操作を行った後、PySpark DataFrameに戻す ... return ctx.spark_session.createDataFrame(pd)

ここで、@transform_df デコレーターは、一連の入力と出力のパスを関数に割り当てます。これらのパスは、データセットの読み込みと書き込みに使用されます。この関数 my_compute_function は、2つの PySpark DataFrame を引数として受け取り、1つの DataFrame を返します。

first_input.filter(first_input.county == 'UK').toPandas() は、"UK"という値を持つ"county"カラムを持つ行だけをフィルタリングし、その結果をPandas DataFrameに変換します。

最後に、Pandas DataFrameをPySpark DataFrameに戻す操作を行っています。この操作は、ctx.spark_session.createDataFrame(pd) によって実行されます。ここで ctx は、現在のSparkセッションにアクセスできるコンテキストオブジェクトです。

パラメーター

  • output (Output)
    • トランスフォームの単一の Output スペック。
  • inputs (Input)
    • 名前付き Input スペックで構成された kwargs (キーワード引数)。

transform_polars

transforms.api.transform_polars(output, inputs)

  • ラップされた計算関数を Polars トランスフォームとして登録します。

このデコレーターを使用するには、meta.yml ファイルに foundry-transforms-lib-pythonpolarsrun 依存関係として追加する必要があります。

transform_polars デコレーターは、polars.DataFrame オブジェクトを受け取り、返す計算関数から Transform オブジェクトを構築するために使用されます。このデコレーターは transform_df() デコレーターに似ていますが、ユーザーコードには polars.DataFrame オブジェクトが渡されます。

transform_polars デコレーターは @lightweight デコレーターの薄いラッパーに過ぎません。これを使用すると、通常のトランスフォームのいくつかの機能を欠いた軽量トランスフォームが作成されます。軽量トランスフォームの詳細については、こちらのドキュメントをご覧ください。

Spark プロファイルやその他の一部のトランスフォーム機能@lightweight トランスフォームでは使用できないため、@transforms_polars でも使用できません。

Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # 名前のないOutput仕様 ... first_input=Input('ri.main.foundry.dataset.in1'), # 最初の入力データ ... second_input=Input('ri.main.foundry.dataset.in2'), # 二番目の入力データ ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... # 'id' を基準にして最初のデータフレームと二番目のデータフレームを内部結合します ... return first_input.join(second_input, on='id', how="inner")

パラメーター

  • output (Output)
    • トランスフォームの単一の Output スペック。
  • inputs (Input)
    • 名前付きの Input スペックで構成されるkwargs(キーワード引数)。