注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Transforms Python API は、Pipeline
を構築するためのクラスとデコレーターを提供します。このページでは、利用可能な関数について説明しています。また、クラスに関する詳細もご覧いただけます。
関数 | 説明 |
---|---|
configure ([profile]) | トランスフォームの設定を変更するデコレーター。 |
incremental ([require_incremental, ...]) | 入力と出力を transforms.api.incremental の対応物に変換するデコレーター。 |
lightweight ([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command]) | デコレートされたトランスフォームをコンテナートランスフォームで実行するようにするデコレーター。 |
transform (ios) | 計算関数を Transform オブジェクトとしてラップします。 |
transform_df (output, inputs) | ラップされた計算関数をデータフレームトランスフォームとして登録します。 |
transform_pandas (output, inputs) | ラップされた計算関数を Pandas トランスフォームとして登録します。 |
transform_polars (output, inputs) | ラップされた計算関数を Polars トランスフォームとして登録します。 |
configure
transforms.api.configure
(profile=None, allowed_run_duration=None, run_as_user=False)configure
デコレーターは、Transform
をラップするために使用される必要があります:Copied!1 2 3 4 5
# 日本語のコメントを追加します >>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) # 実行者のメモリサイズを中に設定 ... @transform(...) # 変換処理を行うデコレータ ... def my_compute_function(...): # 計算を行う関数 ... pass # 実装内容
incremental
transforms.api.incremental
(require_incremental=False, semantic_version=1, snapshot_inputs=None)transforms.api.incremental
に変換するデコレータ。incremental
デコレータは、Transform
をラップするために使用する必要があります:Copied!1 2 3 4
>>> @incremental() # インクリメンタル処理を行うデコレータ ... @transform(...) # データ変換を行うデコレータ ... def my_compute_function(...): # 計算処理を行う関数 ... pass # 実際の処理をここに記述
デコレータは、出力データセットからビルド履歴を読み取り、最後のビルド時の入力の状態を決定します。この情報は、TransformInput
、TransformOutput
、そしてTransformContext
オブジェクトをそれぞれのインクリメンタルな対応物であるIncrementalTransformInput
、IncrementalTransformOutput
、そしてIncrementalTransformContext
に変換するために使用されます。
このデコレータはまた、transform_df()
と transform_pandas()
デコレータをラップするためにも使用することができます。これらのデコレータは、入力に対して引数なしで dataframe()
と pandas()
を呼び出し、PySpark と Pandas DataFrame オブジェクトを抽出します。これは、使用される読み込みモードが常に added
であることを意味し、書き込みモードは incremental
デコレータによって決定されます。非デフォルトモードのいずれかを読み込んだり書き込んだりするためには、transform()
デコレータを使用する必要があります。
ユーザーの PySpark または Pandas の変換により追加された出力行が追加された入力行の関数だけである場合(appendの例に従って)、デフォルトモードは正しいインクリメンタル変換を生成します。
ユーザーの変換が SNAPSHOT
トランザクションを持つデータセットを入力として取り、それがインクリメンタルに変換を実行する能力を変えない場合(参照テーブルのような場合)、完全な SNAPSHOT
として変換を実行するのを避けるために snapshot_inputs
引数をレビューしてください。
ユーザーの変換が複雑なロジック(結合、集約、一意性などを含む)を実行する場合、このデコレータを使用する前に incremental documentation を読むことをお勧めします。
SNAPSHOT
トランザクションが現在の変換の出力を無効にしない入力です。例えば、ルックアップテーブルの更新が以前に計算された出力が間違っていることを意味するわけではありません。変換は、これらを除くすべての入力が新たに追加されたデータしか持っていない場合、または新たに追加されたデータがない場合にインクリメンタルに実行されます。snapshot_inputs を読み取るとき、IncrementalTransformInput
は入力データセットの現在のビューのみを公開します。TypeError
Transform object
ではない場合。KeyError
Transform object
上に存在しない場合。lightweight
transforms.api.lightweight
(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)lightweight
デコレータは Transform
をラップするために使用する必要があります:Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
# Pythonのコードに日本語のコメントを追加します。 >>> @lightweight ... @transform( ... my_input=Input('/input'), # 入力パスを指定します ... my_output=Output('/output') # 出力パスを指定します ... ) ... def compute_func(my_input, my_output): # 関数を定義します ... my_output.write_pandas(my_input.pandas()) # pandasのデータフレームを出力します >>> @lightweight() ... @transform( ... my_input=Input('/input'), # 入力パスを指定します ... my_output=Output('/output') # 出力パスを指定します ... ) ... def compute_func(my_input, my_output): # 関数を定義します ... for file in my_input.filesystem().ls(): # 入力フォルダ内の全ファイルをループします ... with my_input.filesystem().open(file.path) as f1: # ファイルを開きます ... with my_output.filesystem().open(file.path, "w") as f2: # 出力ファイルを開きます ... f2.write(f1.read()) # 入力ファイルの内容を出力ファイルに書き込みます >>> @lightweight(memory_gb=3.5) # メモリ容量を指定します ... @transform_pandas( ... Output('/output'), # 出力パスを指定します ... my_input=Input('/input') # 入力パスを指定します ... ) ... def compute_func(my_input): # 関数を定義します ... return my_input # 入力をそのまま出力します >>> @lightweight(container_image='my-image', container_tag='0.0.1') # コンテナイメージとタグを指定します ... @transform(my_output=Output('ri...my_output')) # 出力パスを指定します ... def run_data_generator_executable(my_output): # 関数を定義します ... os.system('$USER_WORKING_DIR/data_generator') # データ生成器を実行します ... my_output.write_table(pd.read_csv('data.csv')) # CSVファイルを読み込み、テーブルとして出力します
軽量トランスフォームは、単一のノード上でSparkを使用せずに実行されるトランスフォームです。軽量トランスフォームは、小から中規模のデータセットに対して、より高速でコスト効果が高いです。ただし、軽量トランスフォームは、通常のトランスフォームのAPIの一部しかサポートしていませんが、PandasやファイルシステムAPIを含む一方で、データセットにアクセスするためのメソッドをより多く提供しています。軽量トランスフォームの詳細については、軽量ドキュメンテーションをご覧ください。
このデコレータを使用するには、foundry-transforms-lib-python
を依存関係として追加する必要があります。
container_image
、container_tag
、container_shell_command
のいずれかが設定されている場合、container_image
とcontainer_tag
の両方が設定されている必要があります。container_shell_command
が設定されていない場合、デフォルトのエントリーポイントが使用され、トランスフォームで指定されたユーザーコードをPython環境で実行します。
container_*引数を指定することは、自己提供コンテナ(BYOC)ワークフローと呼ばれています。これにより、ユーザーのコードリポジトリのすべてのファイルがランタイム時に$USER_WORKING_DIR/user_code
内で利用可能になり、Python環境も利用可能になります。
container_image
とcontainer_tag
で指定されたイメージは、コードリポジトリの作成物バックアップリポジトリから利用可能でなければなりません。詳細については、BYOCドキュメンテーションをご覧ください。
memory_gb
またはmemory_mb
のどちらか一方を指定できますが、両方を指定することはできません。transform
transforms.api.transform
(ios)計算機能をトランスフォームオブジェクトとしてラップします。
transform
デコレータは、計算関数からTransform
オブジェクトを構築するために使用されます。入力と出力に使用する名前は、ラップされた計算関数のパラメーター名であるべきです。計算時には、関数にはTransformInput
オブジェクトとTransformOutput
オブジェクトとして入力と出力が渡されます。
Copied!1 2 3 4 5 6 7 8 9 10
>>> @transform( ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセットへのパス ... second_input=Input('/path/to/second/input/dataset'), # 2番目の入力データセットへのパス ... first_output=Output('/path/to/first/output/dataset'), # 最初の出力データセットへのパス ... second_output=Output('/path/to/second/output/dataset'), # 2番目の出力データセットへのパス ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... first_output.write_dataframe(first_input.dataframe()) # 最初の入力データセットを最初の出力データセットに書き込む ... second_output.write_dataframe(second_input.dataframe()) # 2番目の入力データセットを2番目の出力データセットに書き込む
計算機能はデータをその出力に書き込む責任があります。
必要に応じて、TransformContext と対応する SparkSession も計算機能内でアクセスできます。これを使用して空のデータフレームを作成したり、Spark の設定を適用したりなどが可能です。可能な場合は、SparkSession オブジェクトを介して Spark の設定値を設定するよりも、既存のデフォルト Spark プロファイルを使用することをお勧めします。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
>>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成します。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... output.write_dataframe(empty_df)
transform_df
transforms.api.transform_df
(output, inputs)transform_df
デコレータは、pyspark.sql.DataFrame
オブジェクトを受け取り、返す計算関数から Transform
オブジェクトを構築するために使用されます。transform()
デコレータと同様に、入力名は計算関数のパラメーター名になります。ただし、transform_df
は位置引数として単一の Output
spec のみを受け入れます。計算関数の戻り値もまた DataFrame
であり、自動的に単一の出力データセットに書き出されます。Copied!1 2 3 4 5 6 7 8 9
>>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... # 第一の入力データと第二の入力データを結合する ... return first_input.union(second_input)
オプションとして、TransformContext および対応する SparkSession も compute 関数内でアクセス可能です。これを利用して空のデータフレームを作成したり、Spark の設定を適用したりすることができます。可能な限り、既存のデフォルトの Spark プロファイルを使用し、SparkSession オブジェクト経由で Spark の設定値を設定するのではなく、推奨します。
Copied!1 2 3 4 5 6 7 8 9 10 11
>>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # この例では、Sparkセッションを使用して空のデータフレームを作成します。 ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))
transform_pandas
transforms.api.transform_pandas
(output、inputs)Pandasライブラリを使用するには、meta.yml
ファイルで pandas
を実行依存関係に追加する必要があります。
transform_pandas
デコレータは、pandas.DataFrame
オブジェクトを受け取り、返す計算関数から Transform
オブジェクトを構築するために使用されます。このデコレータは、transform_df()
デコレータに似ていますが、計算の前に pyspark.sql.DataFrame
オブジェクトが pandas.DataFrame
オブジェクトに変換され、計算後に戻されます。
Copied!1 2 3 4 5 6 7 8
>>> @transform_pandas( ... Output('/path/to/output/dataset'), # 名前のないOutput仕様 ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセット ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセット ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... return first_input.concat(second_input) # 二つの入力データフレームを結合
transform_pandas
は、メモリに収まるデータセットにのみ使用する必要があります。Pandasに変換する前に最初にフィルター処理したい大量のデータセットがある場合は、transform_df()
デコレータと pyspark.sql.SparkSession.createDataFrame()
メソッドを使用して変換を記述する必要があります。
Copied!1 2 3 4 5 6 7 8 9 10
>>> @transform_df( ... Output('/path/to/output/dataset'), # 名前のないOutput spec ... first_input=Input('/path/to/first/input/dataset'), # 最初の入力データセットのパス ... second_input=Input('/path/to/second/input/dataset'), # 二番目の入力データセットのパス ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() # データの一部にpandas操作を適用し、それをPySpark DataFrameに変換します ... # 'UK'というカウンティを持つ行だけをフィルタリングして、pandasのDataFrameに変換します ... return ctx.spark_session.createDataFrame(pd) # pandasのDataFrameをPySparkのDataFrameに戻します
transform_polars
transforms.api.transform_polars
(output, inputs)このデコレーターを使用するには、 meta.yml
ファイルに foundry-transforms-lib-python
と polars
を実行時依存関係として追加する必要があります。
transform_polars
デコレーターは、 polars.DataFrame
オブジェクトを受け取り返す計算関数から Transform
オブジェクトを構築するために使用されます。このデコレーターは transform_df()
デコレーターに似ていますが、ユーザーコードは polars.DataFrame
オブジェクトが渡されます。
transform_polars
デコレーターは @lightweight
デコレーターの薄いラッパーにすぎません。これを使用すると、通常のトランスフォームの一部の機能が欠けている軽量トランスフォームが作成されます。軽量トランスフォームの詳細については、 軽量ドキュメンテーションをご覧ください。
Spark プロファイルと 一部の他のトランスフォーム機能 は @lightweight
トランスフォームで使用できないため、 @transforms_polars
でも使用できません。
Copied!1 2 3 4 5 6 7 8 9
>>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # 名前のない Output spec ... first_input=Input('ri.main.foundry.dataset.in1'), # 最初の入力データ ... second_input=Input('ri.main.foundry.dataset.in2'), # 二番目の入力データ ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... # 'id' を基に、first_input と second_input を結合する ... return first_input.join(second_input, on='id', how="inner")