注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
この内容は learn.palantir.com ↗ でも以下の内容をご覧いただけますが、アクセシビリティの観点から、ここに掲載しています。
トレーニングのために、基本的な PySpark コードを使用して、生のフライトアラートとマッピングデータセットを使用した データ変換の紹介 チュートリアルと同じように、ユーザーのデータソースプロジェクトフォルダーに passenger_flight_alerts_csv_raw
と passengers_json_raw
データセットのコピーを作成します。その結果、パイプラインの乗客データソースセグメントの専用のスタートポイントが作成されますが、通常は Data Connection 同期でソースファイルを取り込み、同期したデータセットを入力として使用します。
生のファイルを扱うため、単純な PySpark アイデンティティトランスフォームでは十分ではありません。@transform()
デコレーターと独自の Python ライブラリに依存する独自のコードを実装する必要があります。
⚠️ データソースが存在する Foundry Training and Resources プロジェクトではない場所で出力を生成するため、トランスフォームファイルのデータソースパスを必要に応じて調整し、要求されたときに ここ で説明されているプロセスを使用して プロジェクトリファレンス を作成する必要があります。
画面の左上にあるコードリポジトリの ファイル セクションでフォルダー構造を展開し、/src
の下の /datasets
フォルダーを右クリックして、その中に新しいフォルダー /raw
を作成します。
新しい /raw
フォルダーに2つの新しい Python ファイルを作成します:
passengers_raw.py
passenger_flight_alerts_raw.py
passenger_flight_alerts_raw.py
ファイルを開き、出力ファイルパスのルートにある大文字と小文字を区別する スペース をメモしてください。例えば:Output("/thisIsTheSpace/Foundry Training and Resources/...
エディターのすべてのデフォルトコードを以下のコードブロックに置き換えます。ただし、入力と出力のパスは、トレーニングプロジェクトのパスとユーザーのチュートリアル作成物フォルダーにそれぞれ合わせる必要があります。
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${space}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passenger_flight_alerts_raw"),
raw_file_input=Input("/${space}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passenger_flight_alerts_csv_raw"),
)
def compute(output, raw_file_input):
"""
Create variables for the filesystems of the input and output datasets
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
This function takes a row of file metadata as input
This function copies the file from the input dataset to the output dataset
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
In this dataframe, each row represents a single raw file from the input dataset
"""
files_df = input_filesystem.files()
"""
Runs the copy_file_without_doing_anything in parallel on Spark
This code will scale well because it leverages Spark rather than running on the driver
"""
files_df.foreach(copy_file_without_doing_anything)
コードをコピーしたら、ステップ 4 でメモしたスペースに合わせて、行 6 と 7 の ${space}
を置き換えます。
行 6 の ${yourName}
をユーザーの /Tutorial Practice Artifacts
フォルダーの名前に置き換えます(例えば、.../Foundry Reference Project/Tutorial Practice Artifacts/jmeier/...
)。
passengers_raw.py
ファイルでステップ 3-6 を繰り返し、デフォルトのコードを以下のコードブロックに置き換えます。
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${space}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passengers_raw"),
raw_file_input=Input("/${space}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passengers_json_raw"),
)
def compute(output, raw_file_input):
"""
Create variables for the filesystems of the input and output datasets
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
This function takes a row of file metadata as input
This function copies the file from the input dataset to the output dataset
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
In this dataframe, each row represents a single raw file from the input dataset
"""
files_df = input_filesystem.files()
"""
Runs the copy_file_without_doing_anything in parallel on Spark
This code will scale well because it leverages Spark rather than running on the driver
"""
files_df.foreach(copy_file_without_doing_anything)
ℹ️ このチュートリアルでは、コピーしたコードの詳細は説明しませんが、コードのコメントを読み進めたり、必要に応じて Python raw file access のドキュメンテーションを参照することができます。
画面の右上にある プレビュー ボタンをクリックして出力をプレビューします。プレビューウィンドウで 入力ファイルを設定する を求められた場合は、設定...
をクリックし、次の画面で passenger_flight_alerts.csv
の隣のボックスをチェックします。
次に、右側の青い 保存してプレビュー ボタンをクリックします。プレビューの出力は行と列の一連ではなく、生の CSV ファイルのコピーが生成されることを単に確認します。
意味のあるメッセージ(例えば、機能:生の変換ファイルを追加)でコードをコミットします。
フィーチャーブランチで両方のデータセットをビルドします。
ビルドが完了したら、以前のチュートリアルで説明されているプロセスを使用して、両方の変換ファイルのデータセットパスを RID に置き換えることを検討してください。
以前のチュートリアルで説明されている PR プロセスを使用して、コードを Master
ブランチにマージします。
Master
ブランチで両方の生の出力をビルドします。