注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
トレーニングのため、基本的な PySpark コードを使用して、passenger_flight_alerts_csv_raw
と passengers_json_raw
データセットのコピーをユーザーのデータソースプロジェクトフォルダーに作成します。これは、データ変換の導入 チュートリアルで生のフライトアラートとマッピングデータセットと同じように行います。結果として、ユーザーのパイプラインの乗客データソースセグメントの専用の開始点が得られますが、通常はデータ接続同期でソースファイルを単純に取り込み、同期したデータセットをユーザーの入力として使用します。
生のファイルを扱っているため、単純な PySpark アイデンティティ変換では十分ではありません。@transform()
デコレーターや独自の Python ライブラリに依存する特異なコードを実装する必要があります。
⚠️ データソースが存在する Foundry Training and Resources プロジェクトではなく、出力を生成するため、変換ファイル内のデータソースパスを必要に応じて調整し、ここで説明されているプロセスを使用して プロジェクト参照 を作成する必要があります。
ユーザーの画面の左上にあるコードリポジトリの ファイル セクションでフォルダー構造を展開し、/src
の下の /datasets
フォルダーを右クリックして、その中に /raw
という新しいフォルダーを作成します。
新しく作成した /raw
フォルダーに次の2つの Python ファイルを作成します:
passengers_raw.py
passenger_flight_alerts_raw.py
passenger_flight_alerts_raw.py
ファイルを開き、出力ファイルパスのルートにある大文字と小文字を区別する ネームスペース をメモします。例: Output("/thisIsTheNamespace/Foundry Training and Resources/...
エディター内の すべて のデフォルトコードを下記のコードブロックで置き換えます。なお、入力パスと出力パスをトレーニングプロジェクトのパスとユーザーのチュートリアル作成物フォルダーにそれぞれ合わせる必要があることに注意してください。
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passenger_flight_alerts_raw"),
raw_file_input=Input("/${namespace}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passenger_flight_alerts_csv_raw"),
)
def compute(output, raw_file_input):
"""
Create variables for the filesystems of the input and output datasets
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
This function takes a row of file metadata as input
This function copies the file from the input dataset to the output dataset
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
In this dataframe, each row represents a single raw file from the input dataset
"""
files_df = input_filesystem.files()
"""
Runs the copy_file_without_doing_anything in parallel on Spark
This code will scale well because it leverages Spark rather than running on the driver
"""
files_df.foreach(copy_file_without_doing_anything)
コードをコピーしたら、ステップ4で記録したネームスペースに合わせて、6行目と7行目の ${namespace}
を置き換えます。
6行目の ${yourName}
を ユーザーの /Tutorial Practice Artifacts
フォルダーの名前に置き換えます(例:.../Foundry Reference Project/Tutorial Practice Artifacts/jmeier/...
)。
passengers_raw.py
ファイルでステップ3-6を繰り返し、デフォルトのコードを下記のコードブロックに置き換えます。
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passengers_raw"),
raw_file_input=Input("/${namespace}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passengers_json_raw"),
)
def compute(output, raw_file_input):
"""
Create variables for the filesystems of the input and output datasets
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
This function takes a row of file metadata as input
This function copies the file from the input dataset to the output dataset
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
In this dataframe, each row represents a single raw file from the input dataset
"""
files_df = input_filesystem.files()
"""
Runs the copy_file_without_doing_anything in parallel on Spark
This code will scale well because it leverages Spark rather than running on the driver
"""
files_df.foreach(copy_file_without_doing_anything)
ℹ️ このチュートリアルでは、コピーしたコードの詳細は説明しませんが、コードのコメントを読んだり、Python の生ファイルアクセスに関するドキュメンテーションを参照したりすることは自由です。
ユーザーの画面の右上にある プレビュー ボタンをクリックして出力をプレビューします。プレビューウィンドウで 入力ファイルの設定 を求められた場合は、設定...
をクリックし、次の画面で passenger_flight_alerts.csv
の隣にあるボックスをチェックします。
次に、右側の青い 保存してプレビュー ボタンをクリックします。プレビューの出力は、行と列の一連のものではなく、単に生の CSV ファイルのコピーが生成されていることを確認するだけです。
意味のあるメッセージ(例:feature: add raw transform files)でコードをコミットします。
フィーチャーブランチ上で両方のデータセットをビルドします。
ビルドが完了したら、両方の変換ファイルのデータセットパスを、前のチュートリアルで説明されているプロセスを使用して RID に置き換えることを検討してみてください。
前のチュートリアルで説明されている PR プロセスを使用して、ユーザーのコードを Master
ブランチにマージします。
Master
ブランチで両方の生出力をビルドします。