注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Transforms Python API

Transforms Python API は、Pipeline を構築するためのクラスとデコレーターを提供します。このページでは、利用可能な関数について説明しています。また、クラスに関する詳細もご覧いただけます。

関数説明
configure([profile])トランスフォームの設定を変更するデコレーター。
incremental([require_incremental, ...])入力と出力を transforms.api.incremental の対応物に変換するデコレーター。
lightweight([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command])デコレートされたトランスフォームをコンテナートランスフォームで実行するようにするデコレーター。
transform(ios)計算関数を Transform オブジェクトとしてラップします。
transform_df(output, inputs)ラップされた計算関数をデータフレームトランスフォームとして登録します。
transform_pandas(output, inputs)ラップされた計算関数を Pandas トランスフォームとして登録します。
transform_polars(output, inputs)ラップされた計算関数を Polars トランスフォームとして登録します。

configure

transforms.api.configure(profile=None, allowed_run_duration=None, run_as_user=False)

  • トランスフォームの設定を変更するデコレーター。
  • configure デコレーターは、Transform をラップするために使用される必要があります:
Copied!
1 2 3 4 5 # 日本語のコメントを追加します >>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) # 実行者のメモリサイズを中に設定 ... @transform(...) # 変換処理を行うデコレータ ... def my_compute_function(...): # 計算を行う関数 ... pass # 実装内容

パラメーター

  • profile (str または List[str], オプション)
    • 使用する変換プロファイルの名前。
  • allowed_run_duration (timedelta, オプション)
    • このジョブが失敗する前にかかる時間の上限。慎重に使用してください。許容される期間を設定する際には、データのスケールや形状の変化などの変数を考慮してください。期間は分単位でのみ精度があります。重要: インクリメンタルな変換で使用する際は注意してください。スナップショットの実行時に期間が大幅に変わる可能性があります。
  • run_as_user (boolean, オプション)
    • 変換がユーザー権限で実行されるかどうかを決定します。有効にすると、ジョブの実行ユーザーの権限に応じて、ジョブの動作が異なります。

例外

  • TypeError
    • ラップされたオブジェクトが Transform オブジェクトでない場合。

incremental

transforms.api.incremental(require_incremental=False, semantic_version=1, snapshot_inputs=None)

  • 入力と出力をそれぞれの transforms.api.incremental に変換するデコレータ。
  • incremental デコレータは、Transform をラップするために使用する必要があります:
Copied!
1 2 3 4 >>> @incremental() # インクリメンタル処理を行うデコレータ ... @transform(...) # データ変換を行うデコレータ ... def my_compute_function(...): # 計算処理を行う関数 ... pass # 実際の処理をここに記述

デコレータは、出力データセットからビルド履歴を読み取り、最後のビルド時の入力の状態を決定します。この情報は、TransformInputTransformOutput、そしてTransformContextオブジェクトをそれぞれのインクリメンタルな対応物であるIncrementalTransformInputIncrementalTransformOutput、そしてIncrementalTransformContextに変換するために使用されます。

このデコレータはまた、transform_df()transform_pandas() デコレータをラップするためにも使用することができます。これらのデコレータは、入力に対して引数なしで dataframe()pandas() を呼び出し、PySpark と Pandas DataFrame オブジェクトを抽出します。これは、使用される読み込みモードが常に added であることを意味し、書き込みモードは incremental デコレータによって決定されます。非デフォルトモードのいずれかを読み込んだり書き込んだりするためには、transform() デコレータを使用する必要があります。

ユーザーの PySpark または Pandas の変換により追加された出力行が追加された入力行の関数だけである場合(appendの例に従って)、デフォルトモードは正しいインクリメンタル変換を生成します。

ユーザーの変換が SNAPSHOT トランザクションを持つデータセットを入力として取り、それがインクリメンタルに変換を実行する能力を変えない場合(参照テーブルのような場合)、完全な SNAPSHOT として変換を実行するのを避けるために snapshot_inputs 引数をレビューしてください。

ユーザーの変換が複雑なロジック(結合、集約、一意性などを含む)を実行する場合、このデコレータを使用する前に incremental documentation を読むことをお勧めします。

パラメーター

  • require_incremental (bool, optional)
    • Trueの場合、変換はすべての出力データセットがコミットされたトランザクションを持たない限り、非インクリメンタルに実行を拒否します。
  • semantic_version (int, optional)
    • デフォルトは1です。この数字は変換の意味的な性質を表します。変換のロジックが既存の出力を無効にするような方法で変更されるたびに、この数字を変更する必要があります。この数字を変更すると、変換の次の実行は非インクリメンタルで実行されます。
  • snapshot_inputs (str のリスト, optional)
    • SNAPSHOTトランザクションが現在の変換の出力を無効にしない入力です。例えば、ルックアップテーブルの更新が以前に計算された出力が間違っていることを意味するわけではありません。変換は、これらを除くすべての入力が新たに追加されたデータしか持っていない場合、または新たに追加されたデータがない場合にインクリメンタルに実行されます。snapshot_inputs を読み取るとき、IncrementalTransformInputは入力データセットの現在のビューのみを公開します。
  • allow_retention (bool, optional)
    • Trueの場合、foundry-retentionによる削除はインクリメンタリティを壊しません。
  • strict_append (bool, optional)
    • Trueの場合、基礎となる foundry トランザクションタイプは APPEND になります。書き込み操作は、Parquet のサマリーメタデータや Hadoop SUCCESS ファイルなどの補助的なファイルを上書きしてはならないことに注意してください。すべての Foundry 形式のインクリメンタルな書き込みはこのモードをサポートする必要があります。

例外


lightweight

transforms.api.lightweight(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)

  • デコレータは、装飾された変換がコンテナ変換インフラストラクチャの上で実行するようにします。
  • lightweight デコレータは Transform をラップするために使用する必要があります:
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 # Pythonのコードに日本語のコメントを追加します。 >>> @lightweight ... @transform( ... my_input=Input('/input'), # 入力パスを指定します ... my_output=Output('/output') # 出力パスを指定します ... ) ... def compute_func(my_input, my_output): # 関数を定義します ... my_output.write_pandas(my_input.pandas()) # pandasのデータフレームを出力します >>> @lightweight() ... @transform( ... my_input=Input('/input'), # 入力パスを指定します ... my_output=Output('/output') # 出力パスを指定します ... ) ... def compute_func(my_input, my_output): # 関数を定義します ... for file in my_input.filesystem().ls(): # 入力フォルダ内の全ファイルをループします ... with my_input.filesystem().open(file.path) as f1: # ファイルを開きます ... with my_output.filesystem().open(file.path, "w") as f2: # 出力ファイルを開きます ... f2.write(f1.read()) # 入力ファイルの内容を出力ファイルに書き込みます >>> @lightweight(memory_gb=3.5) # メモリ容量を指定します ... @transform_pandas( ... Output('/output'), # 出力パスを指定します ... my_input=Input('/input') # 入力パスを指定します ... ) ... def compute_func(my_input): # 関数を定義します ... return my_input # 入力をそのまま出力します >>> @lightweight(container_image='my-image', container_tag='0.0.1') # コンテナイメージとタグを指定します ... @transform(my_output=Output('ri...my_output')) # 出力パスを指定します ... def run_data_generator_executable(my_output): # 関数を定義します ... os.system('$USER_WORKING_DIR/data_generator') # データ生成器を実行します ... my_output.write_table(pd.read_csv('data.csv')) # CSVファイルを読み込み、テーブルとして出力します

軽量トランスフォームは、単一のノード上でSparkを使用せずに実行されるトランスフォームです。軽量トランスフォームは、小から中規模のデータセットに対して、より高速でコスト効果が高いです。ただし、軽量トランスフォームは、通常のトランスフォームのAPIの一部しかサポートしていませんが、PandasやファイルシステムAPIを含む一方で、データセットにアクセスするためのメソッドをより多く提供しています。軽量トランスフォームの詳細については、軽量ドキュメンテーションをご覧ください。

このデコレータを使用するには、foundry-transforms-lib-pythonを依存関係として追加する必要があります。

container_imagecontainer_tagcontainer_shell_commandのいずれかが設定されている場合、container_imagecontainer_tagの両方が設定されている必要があります。container_shell_commandが設定されていない場合、デフォルトのエントリーポイントが使用され、トランスフォームで指定されたユーザーコードをPython環境で実行します。

container_*引数を指定することは、自己提供コンテナ(BYOC)ワークフローと呼ばれています。これにより、ユーザーのコードリポジトリのすべてのファイルがランタイム時に$USER_WORKING_DIR/user_code内で利用可能になり、Python環境も利用可能になります。

container_imagecontainer_tagで指定されたイメージは、コードリポジトリの作成物バックアップリポジトリから利用可能でなければなりません。詳細については、BYOCドキュメンテーションをご覧ください。

パラメーター

  • cpu_cores (float, optional)
    • トランスフォームに割り当てるCPUコアの数で、分数でもかまいません。デフォルトは2です。
  • memory_mb (float, optional)
    • コンテナに割り当てるメモリの量をMBで指定します。memory_gbまたはmemory_mbのどちらか一方を指定できますが、両方を指定することはできません。
  • memory_gb (float, optional)
    • コンテナに割り当てるメモリの量をGBで指定します。デフォルトは16GBです。
  • gpu_type (str, optional_)
    • トランスフォームに割り当てるGPUのタイプ。
  • container_image (str, optional_)
    • トランスフォームのコンテナで使用するイメージ。
  • container_tag (str, optional_)
    • トランスフォームのコンテナで使用するイメージタグ。
  • container_shell_command (str, optional_)
    • コンテナ内で実行するシェルコマンド。指定しなかった場合、デフォルトの値が生成され、コンテナが開始されたらユーザーのトランスフォームが実行されます。

transform

transforms.api.transform(ios)

  • 計算機能をトランスフォームオブジェクトとしてラップします。

  • transformデコレータは、計算関数からTransformオブジェクトを構築するために使用されます。入力と出力に使用する名前は、ラップされた計算関数のパラメーター名であるべきです。計算時には、関数にはTransformInputオブジェクトとTransformOutputオブジェクトとして入力と出力が渡されます。

Copied!
1 2 3 4 5 6 7 8 9 10 >>> @transform( ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセットへのパス ... second_input=Input('/path/to/second/input/dataset'), # 2番目の入力データセットへのパス ... first_output=Output('/path/to/first/output/dataset'), # 最初の出力データセットへのパス ... second_output=Output('/path/to/second/output/dataset'), # 2番目の出力データセットへのパス ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... first_output.write_dataframe(first_input.dataframe()) # 最初の入力データセットを最初の出力データセットに書き込む ... second_output.write_dataframe(second_input.dataframe()) # 2番目の入力データセットを2番目の出力データセットに書き込む

パラメーター

  • ios (Input または Output)
    • 名前付きの InputOutput の仕様を含む kwargs (キーワード引数)。

計算機能はデータをその出力に書き込む責任があります。

必要に応じて、TransformContext と対応する SparkSession も計算機能内でアクセスできます。これを使用して空のデータフレームを作成したり、Spark の設定を適用したりなどが可能です。可能な場合は、SparkSession オブジェクトを介して Spark の設定値を設定するよりも、既存のデフォルト Spark プロファイルを使用することをお勧めします。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成します。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... output.write_dataframe(empty_df)

transform_df

transforms.api.transform_df(output, inputs)

  • ラップされた計算関数をデータフレーム変換として登録します。
  • transform_df デコレータは、pyspark.sql.DataFrame オブジェクトを受け取り、返す計算関数から Transform オブジェクトを構築するために使用されます。transform() デコレータと同様に、入力名は計算関数のパラメーター名になります。ただし、transform_df は位置引数として単一の Output spec のみを受け入れます。計算関数の戻り値もまた DataFrameであり、自動的に単一の出力データセットに書き出されます。
Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... # 第一の入力データと第二の入力データを結合する ... return first_input.union(second_input)

パラメーター

  • output (Output)
    • トランスフォームの単一の Output スペック。
  • inputs (Input)
    • 名前付きの Input スペックで構成されるkwargs(キーワード引数)。

オプションとして、TransformContext および対応する SparkSession も compute 関数内でアクセス可能です。これを利用して空のデータフレームを作成したり、Spark の設定を適用したりすることができます。可能な限り、既存のデフォルトの Spark プロファイルを使用し、SparkSession オブジェクト経由で Spark の設定値を設定するのではなく、推奨します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 >>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成します。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))

transform_pandas

transforms.api.transform_pandas(outputinputs)

  • ラップされた計算関数をPandas変換として登録します。

Pandasライブラリを使用するには、meta.ymlファイルで pandas実行依存関係に追加する必要があります。

transform_pandasデコレータは、pandas.DataFrame オブジェクトを受け取り、返す計算関数から Transform オブジェクトを構築するために使用されます。このデコレータは、transform_df() デコレータに似ていますが、計算の前に pyspark.sql.DataFrame オブジェクトが pandas.DataFrame オブジェクトに変換され、計算後に戻されます。

Copied!
1 2 3 4 5 6 7 8 >>> @transform_pandas( ... Output('/path/to/output/dataset'), # 名前のないOutput仕様 ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセット ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセット ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... return first_input.concat(second_input) # 二つの入力データフレームを結合

transform_pandasは、メモリに収まるデータセットにのみ使用する必要があります。Pandasに変換する前に最初にフィルター処理したい大量のデータセットがある場合は、transform_df() デコレータと pyspark.sql.SparkSession.createDataFrame() メソッドを使用して変換を記述する必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 >>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセットのパス ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセットのパス ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() # データの一部にpandas操作を適用し、それをPySpark DataFrameに変換します ... # 'UK'というカウンティを持つ行だけをフィルタリングして、pandasのDataFrameに変換します ... return ctx.spark_session.createDataFrame(pd) # pandasのDataFrameをPySparkのDataFrameに戻します

パラメーター

  • output (Output)
    • トランスフォームの単一の Output 仕様。
  • inputs (Input)
    • 名前付き Input 仕様からなる kwargs (キーワード引数)。

transform_polars

transforms.api.transform_polars(output, inputs)

  • ラップされた計算関数を Polars トランスフォームとして登録します。

このデコレーターを使用するには、 meta.yml ファイルに foundry-transforms-lib-pythonpolars実行時依存関係として追加する必要があります。

transform_polars デコレーターは、 polars.DataFrame オブジェクトを受け取り返す計算関数から Transform オブジェクトを構築するために使用されます。このデコレーターは transform_df() デコレーターに似ていますが、ユーザーコードは polars.DataFrame オブジェクトが渡されます。

transform_polars デコレーターは @lightweight デコレーターの薄いラッパーにすぎません。これを使用すると、通常のトランスフォームの一部の機能が欠けている軽量トランスフォームが作成されます。軽量トランスフォームの詳細については、 軽量ドキュメンテーションをご覧ください。

Spark プロファイルと 一部の他のトランスフォーム機能@lightweight トランスフォームで使用できないため、 @transforms_polars でも使用できません。

Copied!
1 2 3 4 5 6 7 8 9 >>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # 名前のない Output spec ... first_input=Input('ri.main.foundry.dataset.in1'), # 最初の入力データ ... second_input=Input('ri.main.foundry.dataset.in2'), # 二番目の入力データ ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... # 'id' を基に、first_input と second_input を結合する ... return first_input.join(second_input, on='id', how="inner")

パラメーター

  • output (Output)
    • トランスフォーム用の単一の Output 仕様。
  • inputs (Input)
    • 名前付きの Input 仕様で構成される kwargs(キーワード引数)。