注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
前のチュートリアルの手順を、passengers_preprocessed.py
変換に対しても繰り返します。
/preprocessed
フォルダー内にpassengers_preprocessed.py
という新しいPythonファイルを作成します。
passengers_preprocessed.py
ファイルを開き、デフォルトのコードを以下のコードに置き換えます。
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
@transform(
parsed_output=Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/datasets/preprocessed/passengers_preprocessed"),
raw_file_input=Input("${passengers_json_raw_RID}"),
)
def read_json(ctx, parsed_output, raw_file_input):
# Create a variable for the filesystem of the input datasets
filesystem = raw_file_input.filesystem()
# Create a variable for the hadoop path of the files in the input dataset
hadoop_path = filesystem.hadoop_path
# Create an array of the absolute path of each file in the input dataset
paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
# Create a Spark dataframe from all of the JSON files in the input dataset
df = ctx.spark_session.read.json(paths)
"""
Write the dataframe to the output dataset, using the sanitize_schema_for_parquet function
to make sure that the column names don't contain any special characters that would break the
output parquet file
"""
parsed_output.write_dataframe(sanitize_schema_for_parquet(df))
以下を置き換えます。
${namespace}
をユーザーの名前空間に置き換えます。${yourName}
をユーザーのチュートリアル練習アーティファクトフォルダー名に置き換えます。${passengers_json_raw_RID}
を、passengers_raw.py
で定義されたpassengers_json_raw
データセットのRIDに置き換えます。右上のプレビューボタンを使って、出力がデータセットとして表示されるかどうかを確認します。
テストが期待通りに動作したら、意味のあるメッセージ(例:「feature: add passengers_preprocessed」)でコードをコミットします。
フィーチャーブランチでpassengers_preprocessed.py
コードをビルドし、出力データセットに乗客とフライトアラートの2列マッピングが含まれていることを確認します。
入力/出力変換パスと関連するRIDを交換することを検討します。
ビルドが正常に完了したら、前のチュートリアルで説明したPRプロセスを使用して、フィーチャーブランチをMaster
にマージします。
最後に、Master
ブランチで両方の前処理済み出力をビルドします。