データ統合Pythonライトウェイト変換概要

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

概要

軽量トランスフォームは、Python のデータ処理パイプラインを実行する新たなバックエンドを表し、すでに馴染み深い トランスフォーム API のほとんどを使用することができます。

個々のコンピューターがますます強力になるにつれて、データ変換の数が増え、単一のノードで実行できるようになります。これは、小規模から中規模のデータセットの場合、分散並列処理に依存せずに変換を実行できることを意味します。このアプローチにより、Spark エグゼキュータの分散オーケストレーションに関連するオーバーヘッドを減らし、データパイプラインの作成に単一ノードの代替手段、例えば Polars (外部) や DuckDB (外部) を使用できるようになります。

クイックスタート

互換性

軽量トランスフォームは、コンテナオーケストレーションインフラストラクチャ上に構築されており、その機能を使用するためには、Foundry エンロールメントに存在していなければなりません。

この例では、Python トランスフォームパイプラインで軽量トランスフォームを使用する方法を示します。Pandas-on-Spark パイプラインが以下のようにあると仮定します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from transforms.api import transform_pandas, Input, Output # pandasの変換を利用するためのデコレータ @transform_pandas( # 出力のパスを指定 Output('/Project/folder/output'), # 入力のパスを指定 df=Input('/Project/folder/input'), ) # 関数定義 def compute(df): return ( # 'Name'列が"A"で始まる行のみをフィルタリング df[df['Name'].str.startswith("A")] # 'Name'と'Age'の列のみを取得 .loc[:, ['Name', 'Age']] # 'Age'でソート .sort_values(by="Age") )

この内容を Lightweight トランスフォームに変更するには、次の手順が必要です。

  1. Python リポジトリを最新バージョンにアップグレードする
  2. ライブラリ タブから foundry-transforms-lib-python をインストールする。
  3. 既存のデコレータの上に @lightweight をインポートおよび適用する。次のコードスニペットに示すように行います。
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # transforms.apiから必要な関数をインポートします from transforms.api import transform_pandas, Input, Output, lightweight # lightweightデコレータは、この関数が大量のデータを処理するための最適化を行うことを示します @lightweight # この関数がパンダスのデータフレームを変換することを示すデコレータ @transform_pandas( # 出力パスを指定します Output('/Project/folder/output'), # 入力パスを指定します df=Input('/Project/folder/input'), ) # データフレームを受け取り、変換を行う関数を定義します def compute(df): return ( # 名前が"A"で始まる行だけをフィルタリングします df[df['Name'].str.startswith("A")] # 'Name'と'Age'の列だけを抽出します .loc[:, ['Name', 'Age']] # 'Age'に基づいて値をソートします .sort_values(by="Age") )

上記のように軽量トランスフォームに移行すると、小規模なデータ上でのトランスフォームの速度が約2倍になります。

上記のように、@lightweightは、@transform_pandasか、.pandas()メソッドのみに依存する@transformパイプラインとのみ互換性があります。

次に、トランスフォームがPolarsを使用してスケーラビリティを向上させることができます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import polars as pl from transforms.api import transform, Input, Output, lightweight @lightweight @transform( # @transform_pandas から @transform に変更しました output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input'), ) def compute(output, dataset): output.write_table(dataset .polars() .filter(pl.col('Name').str.starts_with('A')) # 'A'で始まる名前をフィルタリング .select(['Name', 'Age']) # 'Name'と'Age'の列を選択 .sort(by='Age') # 'Age'でソート )

このパイプラインは、利用可能なすべての CPU コアを使用し、さらに、不要な操作を削除し、Pandas よりも効率的なアルゴリズムを見つけて操作を実行できる Polars のクエリ最適化エンジンも備えています。

次のステップ

Lightweight トランスフォームについてさらに学ぶには、Lightweight トランスフォーム API のドキュメントに進んでください。