注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Copied!1 2 3 4 5 6 7 8
import polars as pl from transforms.api import transform, Input, Output, lightweight @lightweight(cpu_cores=3.4, memory_gb=16) # 軽量な変換デコレーター。CPUコア数とメモリ量を指定 @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): # 入力データセットを読み込み、名前が「A」で始まる行をフィルタリングして、出力に書き込む output.write_table(dataset.polars(lazy=True).filter(pl.col('Name').str.starts_with('A')))
Copied!1 2 3 4 5 6 7 8
@lightweight @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): polars_df = dataset.polars() # polars_df は polars.DataFrame オブジェクト lazy_df = dataset.polars(lazy=True) # ストリーミングモードを有効化、lazy_df は polars.LazyFrame オブジェクト pandas_df = dataset.pandas() # pandas_df は pandas.DataFrame オブジェクト arrow_table = dataset.arrow() # arrow_table は pyarrow.Table オブジェクト output.write_table(lazy_df) # 上記のいずれのフォーマットも write_table に渡すことが可能
Polars は Lightweight トランスフォームを作成するためのデータ処理ライブラリとして推奨されています。可能であれば、Polars をストリーミングモードで使用することをお勧めします。このモードではデータをチャンクごとに読み込み、メモリに収まりきらないサイズのデータセットを処理できるようになります。ストリーミングモードにアクセスするには、.polars(lazy=True)
メソッドを呼び出します。現在、UDF や GROUP BY
操作のような集計、および並べ替えはストリーミングモードでは完全にはサポートされていません。
トランスフォームがオーケストレーション層によって管理されているコンテナ内で実行されるため、コンテナのメモリ制限を超えるとコンテナが終了することがあります。ストリーミングモードでの Polars はこの制限を認識しておらず、制限を超える可能性があります。メモリ不足 (OOM) エラーが発生した場合、@lightweight(memory_gb=32)
または適切な値に設定してコンテナのメモリ制限を増やしてください。
Transforms API リファレンス で Lightweight API についてさらに学ぶか、Lightweight 例 で実際の例を確認してください。