注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
>>> students_input = foundry.input('/examples/students_hair_eye_color') # "/examples/students_hair_eye_color"というパスから学生のデータを読み込む
>>> students_input.dataframe().sort('id').show(n=3) # "id"に基づいてデータをソートし、最初の3行を表示する
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
上記は最初の3行のデータのみを表示します
これで、/examples/students_hair_eye_color
を入力として受け取り、/examples/hair_eye_color_processed
を出力として生成する Transform
を定義することができます:
これで、複数の Output
仕様を transform()
デコレーターに渡して、入力を分割することができるようになりました:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # 生徒の髪の色と瞳の色のデータ males=Output('/examples/hair_eye_color_males'), # 男性のデータ出力先 females=Output('/examples/hair_eye_color_females'), # 女性のデータ出力先 ) def brown_hair_by_sex(hair_eye_color, males, females): # type: (TransformInput, TransformOutput, TransformOutput) -> None brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') # 髪の色がブラウンの生徒をフィルター males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male')) # その中から男性を抽出して出力 females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female')) # その中から女性を抽出して出力
次に、上記の Transform decorator セクションの例を変更して、transform_df()
デコレータを使用します。入力として /examples/students_hair_eye_color
を取り、出力として /examples/hair_eye_color_processed
を作成する Transform
を定義します:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from transforms.api import transform, Output @transform( out=Output('/examples/context') ) def generate_dataframe(ctx, out): # type: (TransformContext) -> pyspark.sql.DataFrame # DataFrameを作成します。ここでは、スパークセッションを使用して、 # データフレームを直接作成しています。データフレームは、 # ['a', 1], ['b', 2], ['c', 3] のようなデータと # 'letter', 'number' のようなスキーマから作成されます。 df = ctx.spark_session.createDataFrame([ ['a', 1], ['b', 2], ['c', 3] ], schema=['letter', 'number']) # 出力にデータフレームを書き込みます。 out.write_dataframe(df)
TLLVが正しく機能するためには、ユーザーのコードはすべてのインポートをモジュールレベルで宣言し、別のモジュールのオブジェクトをパッチまたは他の方法で変更しようとしないでください。ユーザーのプロジェクトでこれが満たされていない場合、TLLVを無効にしなければなりません。詳細は以下のコード例を参照してください。
TLLVはデフォルトで有効になっています。 TLLVを無効にするには、transformsPython設定のtllvをfalseに設定します。この設定は、ユーザーのTransforms Pythonサブプロジェクトの build.gradle
ファイル内にあります。
Copied!1 2 3
transformsPython { tllv false # tllvをfalseに設定します。これは、Python変換を無効にすることを意味します。 }
トランスフォームのバージョンは、ロジックの陳腐化を考慮する際に、2つのトランスフォームのバージョンを比較するために使用される文字列です。トランスフォームの出力は、その入力が変更されず、トランスフォームのバージョンが変更されない場合、最新の状態になります。バージョンが変更されると、トランスフォームの出力は無効になり、再計算されます。
デフォルトでは、トランスフォームのバージョンには以下のものが含まれます:
これらのいずれかが変更されると、バージョン文字列も変更されます。 リストに記載されていないパートで変更が発生した場合に出力を無効にしたい場合は、transformsPython設定でtllvFilesを設定します。たとえば、ファイルの設定を読み込んでいて、設定が変更されたときに出力を無効にしたい場合などです。
Copied!1 2 3 4 5 6
transformsPython { # tllvFilesは、プロジェクトディレクトリに対する相対パスで含めたいファイルのパスをリスト形式で指定します tllvFiles = [ 'path/to/file/you/want/to/include/relative/to/project/directory' ] }
プロジェクトの依存関係バージョンが変更されたときに出力が無効になるのを避けるには、tllvIncludeDeps を false に設定してください。
transformsPython {
tllvIncludeDeps false // tllvIncludeDepsをfalseに設定します。これにより、依存関係は含まれません。
}
次のコード例は、有効なインポートと無効なインポートを示しています:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# モジュールのトップでのみインポートしている場合、心配する必要はありません。 from transforms.api import transform_df, Input, Output from myproject.datasets import utils from myproject.testing import test_mock as tmock import importlib # モジュールスコープであれば、`importlib`の`__import__`を使用することは問題ありません。 logging = __import__('logging') my_compute = importlib.import_module('myproject.compute') def helper(x): # これは無効です。関数やクラスメソッドでインポートする場合はTLLVを無効にしなければなりません。 # すべてのインポートはモジュールスコープ内で行う必要があります。 import myproject.helpers as myhelp return myhelp.round(x) @transform_df( Output("/path/to/output/dataset"), my_input=Input("/path/to/input/dataset"), ) def my_compute_function(my_input): # これは無効です。関数内でインポートを使用したい場合はTLLVを無効にしなければなりません! ihelper = __import__('myproject.init_helper') my_functions = importlib.import_module('myproject.functions') return my_input
Copied!1 2 3 4 5
# transforms.api ライブラリから Pipeline をインポートします from transforms.api import Pipeline # Pipeline のインスタンスを作成します my_pipeline = Pipeline()
次のようにして、このPipeline
をsetup.py
に登録できます:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
import os from setuptools import find_packages, setup setup( entry_points={ # 'transforms.pipelines' はエントリーポイントの名前です 'transforms.pipelines': [ # 'root = myproject.pipeline:my_pipeline' はエントリーポイントとして指定する関数の場所を示しています 'root = myproject.pipeline:my_pipeline' ] } )
上記のコードでは、root
はエクスポートしているPipeline
の名前を指し、myproject.pipeline
はPipeline
を含むモジュールを指し、my_pipeline
はそのモジュールで定義されたPipeline
属性を指します。
プロジェクトのPipelineに関連付けられたTransform
オブジェクトがデータセットをOutput
として宣言すると、Foundry でこのデータセットを構築できます。Transform
オブジェクトをPipeline
に追加する2つの推奨方法は、手動登録と自動登録です。
より高度なワークフローを持っている場合や、プロジェクトのPipelineにTransform
オブジェクトを明示的に追加したい場合は、手動登録を使用できます。それ以外の場合は、登録コードが簡潔でまとまっていることを確認するために、自動登録を使用することを強くお勧めします。自動登録では、discover_transforms
メソッドがモジュールレベルで定義されたTransform
オブジェクトを再帰的に検出します。詳細については、以下のセクションを参照してください。
discover_transforms
メソッドは、見つかったすべてのモジュールをインポートします。その結果、インポートされたモジュール内のコードは実行されます。
プロジェクトの複雑さが増すにつれて、Transform
オブジェクトをPipeline
に手動で追加することが煩雑になることがあります。そこで、Pipeline
オブジェクトは、Pythonモジュールやパッケージ内のすべてのTransform
オブジェクトを再帰的に検出するためのdiscover_transforms()
メソッドを提供しています。
Copied!1 2 3 4 5 6 7 8 9 10 11
# transforms.apiからPipelineをインポートします from transforms.api import Pipeline # my_moduleをインポートします import my_module # Pipelineのインスタンスを作成します my_pipeline = Pipeline() # my_module内の変換を探し出します my_pipeline.discover_transforms(my_module)
Transform
オブジェクトは、add_transforms()
関数を使用してPipeline
に手動で追加することができます。この関数は任意の数のTransform
オブジェクトを取り、それらをPipelineに追加します。また、2つのTransform
オブジェクトが同じ出力データセットを宣言していないことを確認します。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# 必要なライブラリをインポートします from transforms.api import transform_df, Pipeline, Input, Output # transform_dfデコレータを使用して、入力データセットを出力データセットに変換する関数を定義します @transform_df( Output('/path/to/output/dataset'), # 出力データセットのパスを指定します my_input=Input('/path/to/input/dataset') # 入力データセットのパスを指定します ) def my_compute_function(my_input): # 計算機能を定義します return my_input # 入力データセットをそのまま返します # Pipelineオブジェクトを作成します my_pipeline = Pipeline() # 作成したパイプラインに変換機能を追加します my_pipeline.add_transforms(my_compute_function)
複数の出力を生成するデータ変換を定義したい場合、トランスフォーム生成を使用するか、複数出力のトランスフォームを定義することができます。トランスフォーム生成では、各出力のために入力を一度読み込み、処理する必要があるかもしれません。複数出力のトランスフォームでは、入力を一度だけ読み込み、処理することが可能です。
複数のトランスフォームオブジェクトで同じデータ変換ロジックを再利用したい場合があります。たとえば、次のようなシナリオを考えてみてください:
これらのケースでは、複数のトランスフォームで同じデータ変換コードを使用すると便利です。各出力ごとにトランスフォームオブジェクトを個別に定義するのではなく、forループを使用してトランスフォームオブジェクトを生成し、それらをユーザーのプロジェクトのパイプラインに一括で登録することができます。トランスフォームの生成については以下の例を参照してください:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from transforms.api import transform_df, Input, Output def transform_generator(sources): # type: (List[str]) -> List[transforms.api.Transform] transforms = [] # この例では複数の入力データセットを使用しています。また、単一の入力データセットから複数の出力を生成することもできます。 for source in sources: @transform_df( Output('/sources/{source}/output'.format(source=source)), # 出力のパスを指定します my_input=Input('/sources/{source}/input'.format(source=source)) # 入力のパスを指定します ) def compute_function(my_input, source=source): # 関数内でsource変数をキャプチャするために、それをデフォルトのキーワード引数として渡します。 return my_input.filter(my_input.source == source) # sourceに一致するデータだけをフィルタリングします transforms.append(compute_function) # 作成した関数をtransformsリストに追加します return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3']) # transform_generator関数を['src1', 'src2', 'src3']で呼び出し、結果をTRANSFORMSに格納します
Copied!1 2 3 4 5 6 7
# my_moduleをインポートします import my_module # Pipelineクラスのインスタンスを作成します my_pipeline = Pipeline() # my_moduleで定義された変換をパイプラインに追加します my_pipeline.add_transforms(*my_module.TRANSFORMS)
手動での登録では、Code RepositoriesのBuildボタンが機能しない場合があり、要求されたファイルからのパイプラインにトランスフォームが見つからないというエラーが表示されます。それでも、これらのデータセットはData LineageまたはDataset Previewアプリケーションでビルドできます。