データ統合Pythonライトウェイト変換ライトウェイト変換 API

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

ライトウェイト変換 API

ライトウェイト変換は PySpark クエリの実行をサポートしていません。代わりに、クエリは代替 API を使用して記述する必要があります。

Spark 変換は、特にデータがすでにローカルに存在する場合に、多数のノード間での計算をスケーリングするという大きなプラットフォーム能力を提供します。しかし、多くのデータ変換は、おそらく単一のマシンで管理可能です。データ処理に単一のマシンが十分な場合、Spark の使用を選択せずに、単一ノードの使用ケースに最適化された計算エンジンを使用してインフラストラクチャオーバーヘッドを削減することができます。

このドキュメンテーションでは、変換 API の @lightweight デコレーターについて説明します。このデコレーターは、Spark の使用を選択しないようにし、最大約1000万行のデータセットを処理するのに適したインフラストラクチャを要求するために、@transform@transform_pandas の上に配置することができます。@lightweight は、単一ノードの変換に最適化されたモダンな計算エンジンである Polars とのファーストクラスの統合も提供します。

実際のパフォーマンスは、パイプラインとデータの複雑さの両方に依存します。したがって、@lightweight バックエンドを使用するかどうかで、変換の実行時間を比較することをお勧めします。

ライトウェイトバックエンドで実行するときには、Spark DataFrames と Spark コンテキストは利用できません。

既存の変換機能の多くは @lightweight を使用しても利用可能です。同一のリポジトリ内で通常の変換とライトウェイト変換を混在させることができ、それらを プレビュー し、ライトウェイト変換を マーケットプレイス を通じてパッケージ化してインストールすることができます。ただし、サポートされていない機能もあり、それについては以下で詳しく読むことができます。

API のハイライト

以下のセクションでは、ライトウェイト変換 API を紹介します。API がさまざまなデータ処理エンジンとのインタラクションにどのように使用されているかの具体的な例を見るために、ライトウェイトの例を確認してください。

@lightweight を使用する前に、以下の前提条件を満たしていることを確認してください:

  1. Python リポジトリを最新バージョンにアップグレードする
  2. ライブラリータブから foundry-transforms-lib-python をインストールする。

リソースのプロビジョニング

リソースを要求するために Spark プロファイルに依存するのではなく、デコレーターを呼び出すときに cpu_coresmemory_gb または memory_mb のキーワード引数を通じて、リソースをより詳細に要求することができます。デフォルトでは、最大許容値は 8 コアと 32 GB のメモリです。これらの制限を増やすには、Palantir サポートに連絡してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 # polarsをplとしてインポートします import polars as pl # transforms.apiからいくつかの関数とクラスをインポートします from transforms.api import transform, Input, Output, lightweight # lightweightデコレータでCPUコア数とメモリサイズを指定します @lightweight(cpu_cores=3.4, memory_gb=16) # transformデコレータで入力データセットと出力先のパスを指定します @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): # データセットから'Name'が'A'で始まる行をフィルタリングして、出力に書き出します out.write_table(dataset.polars(lazy=True).filter(pl.col('Name').str.starts_with('A')))

@lightweight()への引数としてリソース要求を渡すことで、ユーザーの変換に必要なリソースを微調整できます。下のスニペットにある値は、Spark変換の通常のデフォルトを反映しています。

リソースプロビジョニングAPIの追加利点として、ユーザーは今、GPUを効率的にリクエストできるようになりました:

Copied!
1 2 3 4 5 6 7 8 import torch # meta.yamlにpytorchとpytorch-gpuを追加することを忘れないでください。 import logging from transforms.api import transform, Output, lightweight @lightweight(gpu_type='NVIDIA_T4') # GPUタイプを'NVIDIA_T4'として設定 @transform(out=Output('/Project/folder/output')) # 出力先を'/Project/folder/output'に設定 def compute(out): logging.info(torch.cuda.get_device_name(0)) # 利用中のCUDAデバイスの名前をログに出力

上記のスニペットは、ユーザーのFoundryエンロールメントがNVIDIA T4 GPUを備えており、それがプロジェクトで利用可能であることを前提としています。

データフォーマット

@transform_pandasの上に@lightweightを使用する場合、@lightweightがない場合と同じAPIを使用できます。@transformの上に@lightweightを使用すると、ユーザー関数の入力と出力に追加のメソッドが提供されます。

Copied!
1 2 3 4 5 6 7 8 @lightweight @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): polars_df = dataset.polars() # polars_dfはpolars.DataFrameオブジェクトです lazy_df = dataset.polars(lazy=True) # ストリーミングモードを有効にします、lazy_dfはpolars.LazyFrameです pandas_df = dataset.pandas() # pandas_dfはpandas.DataFrameです arrow_table = dataset.arrow() # arrow_tableはpyarrow.Tableです out.write_table(lazy_df) # 上記のどれでもwrite_tableに渡すことができます

上記のスニペットを参照して、利用可能なデータセット形式を確認してください。dataset.pandas() を呼び出すと、Pandas が環境にインストールされていることが期待されます。同様に、dataset.polars(...) は Polars が利用可能になっていることが必要です。

ファイルへのアクセス

軽量な入力と出力でも、.filesystem() などのよく知られたメソッドが利用できます。以下のスニペットでは、非構造化ファイル@lightweight なしで扱われるのと同じ方法で処理できることを示しています。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # lightweightデコレータを使用します。これは、コードを簡素化し、メモリ使用量を抑えます。 @lightweight # transformデコレータを使用します。これは、入力と出力のパスを指定します。 @transform(my_output=Output('/Project/folder/output'), my_input=Input('/Project/folder/input')) # files関数を定義します。この関数は、my_inputから読み取ったファイルをmy_outputに書き込みます。 def files(my_input, my_output): # 入力フォルダ内の各ファイルに対してループを行います。 for file in my_input.filesystem().ls(): # 入力ファイルをバイナリ読み取りモードで開きます。 with my_input.filesystem().open(file.path, "rb") as f1: # 出力ファイルをバイナリ書き込みモードで開きます。 with my_output.filesystem().open(file.path, "wb") as f2: # 入力ファイルから読み取ったデータを出力ファイルに書き込みます。 f2.write(f1.read())

トランスフォームジェネレーター

トランスフォームジェネレーターをライトウェイトトランスフォームと一緒に使用することもサポートされています。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def create_transforms(): results = [] # 10と20の2つのサイズに対して変換を作成します for size in [10, 20]: # lightweightデコレーターを使用して、軽量の処理を行います @lightweight # transformデコレーターを使用して、出力と入力を定義します @transform( # 出力はroot_folder内の"demo-outputs"ディレクトリに保存されます output=Output(f"{root_folder}/demo-outputs/lightweight-polars-{size}"), # 入力はroot_folder内の"demo-inputs"ディレクトリから取得します df=Input(f"{root_folder}/demo-inputs/people-{size}") ) def lightweight_polars(output, df): # polarsの実装を用いてデータフレームを処理し、結果を出力します output.write_table(polars_implementation(df.polars(lazy=True))) # 結果をリストに追加します results.append(lightweight_polars) # 処理結果のリストを返します return results # TRANSFORMS変数に作成した変換のリストを保存します TRANSFORMS = create_transforms()

ライトウェイト変換の制限

以下の機能は、現在ライトウェイト変換でサポートされていません:

  • インクリメンタルな変換
  • データ期待値

Polars

Polars は、単一ノード変換に最適化されたモダンな計算エンジンです。PolarsはRustで記述されており、Pythonの薄いラッパーを提供して、パイプライン実行中にネイティブの高速化を活用しながら、ユーザーのコードをPythonで記述できるようにします。

例えば、これはPalantirがライトウェイト変換をベンチマークするために使用するPolarsのパイプラインです:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def polars_implementation(polars_df): # データフレームの'ID'列をInt64型にキャストし、エイリアスとして'id'を付ける polars_df = polars_df.with_columns( pl.col("id").cast(pl.Int64).alias("id") ) # 'follows'列を爆発させて新たなデータフレームを作成し、 # 'id'列と'follows'列をそれぞれ'id1'と'id2'というエイリアスで選択 reciprocated_follows = ( polars_df .explode("follows") .select([ pl.col("id").alias("id1"), pl.col("follows").cast(pl.Int64).alias("id2"), ]) ) # reciprocated_followsを自己結合し、 # 'id1'と'id2'でグループ化した後、'id2'のカウントを取得し、エイリアスとして'reciprocated_follows_count'を付ける # その後、元のデータフレームと結合し、不要な列を削除 return ( polars_df .join( reciprocated_follows .join( reciprocated_follows, left_on=["id1", "id2"], right_on=["id2", "id1"], how="inner" ) .group_by("id1") .agg(pl.count("id2").alias("reciprocated_follows_count")), left_on="id", right_on="id1", how="left", ) .drop(["email", "dob", "id1", "follows"]) )

Polars ストリーミングモード

Polarsは、Lightweight transforms の作成に使用するためのデータ処理ライブラリとして推奨されています。可能な限り、ストリーミングモードでPolarsを使用することをお勧めします。このモードは、データをチャンク単位で読み込み、利用可能なメモリよりも大きなサイズのデータセットの処理を可能にします。ストリーミングモードにアクセスするには、.polars(lazy=True) メソッドを呼び出します。現在、UDFs とソートはストリーミングモードでは完全にサポートされていません。

変換がオーケストレーションレイヤーによって管理されるコンテナ内で実行されるため、コンテナのメモリ制限が超過するとコンテナが終了される可能性があります。 Polarsはストリーミングモードではこの制限を認識していないため、制限を超過する可能性があります。メモリ不足(OOM)エラーが発生した場合は、@lightweight(memory_gb=32) を設定するか、または別の適切な値に設定して、コンテナのメモリ制限を増やしてください。

次のステップ

Transforms API referenceでLightweight APIについて詳しく学び、または Lightweight examplesで実際の例を考慮してみてください。