データ統合Pythonライトウェイト変換ライトウェイト変換の例

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

ライトウェイト変換の例

ライトウェイト変換は速度だけでなく、汎用性も備えています。ライトウェイト変換は、ユーザーが選択した計算エンジンについての仮定をしません。以下の内容では、Lightweight APITransforms API に依存しながら、さまざまな計算エンジンがどのように活用できるかを紹介します。その後、必要な環境を含む独自の Docker イメージを使用して、非常にカスタマイズされた非 Python データ処理アプリケーションを使用する方法を実演します。

現代のシングルノード計算エンジン

以下のすべての統合の基本原則は、Pandas DataFrame、Arrow Table、Polars DataFrame、さらには raw Parquet や CSV ファイルといった複数の形式で tabular Foundry データセット にアクセスできることです。これは Lightweight API でも示されています。メモリ内のテーブルを Foundry に保存しようとするとき、読み込んだ形式のいずれかでそれらを渡すことができます。

Ibis の使用

現代の計算エンジンのほとんどは、データシステムを分散させる概念を採用しており、そのため業界標準のオープンソースソフトウェアで動作します。メモリ内のテーブルを格納するための事実上の標準は、Arrow(外部)です。はじめに、Ibis(外部)(DuckDB バックエンド付き)を使用します。これは、Arrow 形式を内部で使用します。この場合、my_input.arrow() コールを通じて Foundry データセットを pyarrow.Table(外部)オブジェクトとして読み込み、変換し、次に my_output.write_table(...) を使って変換されたオブジェクトを Foundry に書き戻すことができます。次の例スニペットを考慮してください:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 @lightweight @transform(my_input=Input('my-input'), my_output=Output("my-output")) def my_ibis_transform(my_input, my_output): ibis_client = ibis.duckdb.connect(':memory:') # Foundryデータセットをpyarrow.Tableオブジェクトとして取得します table = ibis_client.read_in_memory(my_input.arrow()) # データ変換を実行します results = table .filter(table['name'].like('John%')) .execute() # pyarrow.TableオブジェクトをFoundryデータセットに保存します my_output.write_table(results)

Apache DataFusion と DuckDB の利用

場合によっては、データのデシリアライズ手順を省略して、データセットの生の基本ファイルを直接計算エンジンに渡す方が簡単です。ディスク上のファイルへのパス(オンデマンドでダウンロードされる)を取得するには、my_input.path() を呼び出します。生のファイルを Foundry に戻す際には、次の 2 つの制限事項を考慮する必要があります。

  • この API を通じて Foundry データセットに保存できるのは Parquet ファイルのみです。
  • ファイルは、my_output.path_for_write_table の値にあるフォルダーに配置する必要があります。

これらの条件が満たされている場合、このフォルダーへのパスを指定して write_table を呼び出すことができます。具体的には、my_output.write_table(my_output.path_for_write_table) のようにします。これを実際に使ってみるために、プラットフォーム内で DataFusion(外部)を使用する方法を示す次のスニペットを検討してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 @lightweight @transform(my_input=Input('my-input'), my_output=Output("my-output")) def my_datafusion_transform(my_input, my_output): ctx = datafusion.SessionContext() # Datafusionのセッションコンテキストを作成します。 table = ctx.read_parquet(my_input.path()) # my_inputのパスからparquetファイルを読み込み、テーブルを作成します。 my_output.write_table( table .filter(starts_with(col("name"), literal("John"))) # "name"列が"John"で始まる行だけをフィルタリングします。 .to_arrow_table() # 結果のテーブルをArrow形式に変換します。 )

同じアプローチを DuckDB (外部) にも適用できます。次のスニペットが示すように、フォルダーからすべての Parquet ファイルを読み込む必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @lightweight @transform(my_input=Input('my-input'), my_output=Output("my-output")) def my_duckdb_transform(my_input, my_output): duckdb.connect(database=':memory:').execute(f""" COPY ( SELECT * FROM parquet_scan('{my_input.path()}/**/*.parquet') WHERE Name LIKE 'John%' ) TO '{my_output.path_for_write_table}' (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE) """) # スレッドごとに並行して別々のParquetファイルを書き込むことでパフォーマンスを最適化 my_output.write_table(my_output.path_for_write_table)

cuDF の利用

pandas.DataFrame を使用して統合を実現することもできます。次のスニペットでは、Lightweight トランスフォームで cuDF(外部)を使用した例を示しています。これにより、可能な場合は GPU 上で高度に並列化された方法で Pandas のコードが実行されます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @lightweight(gpu_type='NVIDIA_T4', cpu_cores=4, memory_gb=32) @transform(my_input=Input('my-input'), my_output=Output("my-output")) def my_cudf_transform(my_input, my_output): import cudf # 実行時にのみCUDFをインポートし、CI中はインポートしない # pandasのデータフレームからcudfのデータフレームに変換 df = cudf.from_pandas(my_input.pandas()[['name']]) # 名前が "John" で始まる行をフィルタリング filtered_df = df[df['name'].str.startswith('John')] # 名前でソート sorted_df = filtered_df.sort_values(by='name') # 結果を出力 my_output.write_table(sorted_df)

上記のスニペットでは、ユーザーの Foundry エンロールメントが NVIDIA T4 GPU を搭載しており、リソースキューを介してプロジェクトで使用できることを前提としています。

独自のコンテナワークフローを持ち込む

ほとんどの計算は、Python だけで簡単に表現できます。ただし、Python 以外の実行エンジンやスクリプトに依存したい場合もあります。たとえば、F# アプリケーションや古い VBA スクリプト、あるいは Python のサポートが終了したバージョンなどが考えられます。独自のコンテナ(BYOC)ワークフローを使用することで、可能性は無限大です。簡単に言えば、BYOC は、アプリケーションが必要とする特定の依存関係やバイナリをすべて含んだ Docker イメージをローカルで作成し、そのイメージの上で Lightweight な変換を実行する機能を提供します。

以下は、Foundry で COBOL 変換を実行する例です。簡単のため、COBOL プログラムは変換内でコンパイルされますが、バイナリ実行可能ファイルが Docker イメージにコピーされた状態で事前にコンパイルされている場合もあります。まず、コンテナ化されたワークフローを有効にし、これらのイメージ要件に従ってローカルで Docker イメージを構築します。

次に、最小限の Dockerfile の例を考えてみましょう。

Copied!
1 2 3 4 5 6 7 8 9 10 # ベースイメージとして最新のUbuntuを使用 FROM ubuntu:latest # 必要なパッケージをインストール RUN apt update && apt install -y coreutils curl sed build-essential gnucobol # 新しいユーザー(uid 5001)を追加 RUN useradd --uid 5001 user # 作成したユーザーに切り替え USER 5001

DockerイメージにPythonがインストールされていなくても、Lightweightが動作することに注意してください。

その後、イメージをビルドし、Foundryにアップロードします。そのためには、Artifactsリポジトリを作成しタグを付けてプッシュするためのこれらの指示に従います。この例では、イメージにはmy-imageという名前と0.0.1というバージョンのタグが付けられています。BYOC Lightweight変換を作成する前の最後のステップは、このArtifactsリポジトリをローカルのバックアップリポジトリとして追加することです

次の例は、COBOLスクリプトを考慮に入れています。このスクリプトはCSVファイルを生成し、そのソースコードはresources/data_generator.cblに位置しています。

最後のステップは、データ処理プログラムとFoundryを接続するLightweight変換を書くことです。以下のスニペットは、Python APIを通じてデータセットにアクセスする方法と、問題のイメージ内に含まれる任意の実行可能ファイルを含む方法を示しています。COBOL実行可能ファイルを呼び出すには、Python標準ライブラリの関数を使用します(この場合は、os.system(...))。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 @lightweight(container_image='my-image', container_tag='0.0.1') @transform(my_output=Output('my_output')) def compile_cobol_data_generator(my_output): """Condaを通じて取得するのが難しい依存関係を示します。""" # Cobolプログラムをコンパイルします # (srcフォルダーからのすべては$USER_WORKING_DIR/user_codeで利用可能です) os.system("cobc -x -free -o data_generator $USER_WORKING_DIR/user_code/resources/data_generator.cbl") # プログラムを実行してdata.csvを作成し、データを追加します os.system('$USER_WORKING_DIR/data_generator') # 結果をFoundryに保存します my_output.write_table(pd.read_csv('data.csv'))

上記のコードは、Cobolプログラムをコンパイルし、そのプログラムを実行してデータを生成し、その結果をFoundryに保存するというタスクを行っています。

プレビュー

BYOCワークフローのプレビューはまだサポートされていません。

ビルドボタンを使用すると、最終的にはDockerイメージからコンテナをインスタンス化し、指定されたコマンドを実行します。リソースの割り当て、ログ取得、Foundryとの通信、権限の強制、および可監査性はすべて自動的に管理されます。

次のステップ

Lightweight transformsについて詳しく知りたい場合は、ユーザーのFoundryデプロイメントのReference ResourcesマーケットプレイスストアからLightweight examplesマーケットプレイス商品をインストールするか、Transforms API referenceを参照してください。