注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Datasource Project: Flight Alerts プロジェクトのデータセットと同様に、passengers と passenger_flight_alerts(乗客をフライトアラートにマッピングする)データセットにも前処理ステップが必要です。予想通り、最初のタスクはデータセットファイルを読み込み、Spark DataFrames にパースして、最適化された Parquet 形式に書き込むことです。
Parquet は、列名に特殊文字やスペースを許可していないため、前処理ステップでは列名のサニタイズも行う必要があります。Foundry は、transforms.verbs.dataframes
パッケージの sanitize_schema_for_parquet
など、コードベースの変換を効率化するための API やパッケージを提供しています。これにより、DataFrame の列名から Parquet ファイルとして保存できない文字が削除され、データセットのビルドが正常に行われるようになります。
まず、Master
から yourName/feature/tutorial_preprocessed_files
という名前の新しいブランチを作成します。
コードリポジトリの Files セクションで /datasets
フォルダーを右クリックし、新しいフォルダー /preprocessed
を作成します。
/preprocessed
フォルダーに passenger_flight_alerts_preprocessed.py
という名前の新しい Python ファイルを作成します。
passenger_flight_alerts_preprocessed.py
ファイルを開き、デフォルトの内容を以下のコードブロックに置き換えます。
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
@transform(
parsed_output=Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/datasets/preprocessed/passenger_flight_alerts_preprocessed"),
raw_file_input=Input("${passenger_flight_alerts_csv_raw_RID}"),
)
def read_csv(ctx, parsed_output, raw_file_input):
# Create a variable for the filesystem of the input datasets
filesystem = raw_file_input.filesystem()
# Create a variable for the hadoop path of the files in the input dataset
hadoop_path = filesystem.hadoop_path
# Create an array of the absolute path of each file in the input dataset
paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
# Create a Spark dataframe from all of the CSV files in the input dataset
df = (
ctx
.spark_session
.read
.option("encoding", "UTF-8") # UTF-8 is the default
.option("header", True)
.option("inferSchema", True)
.csv(paths)
)
"""
Write the dataframe to the output dataset using the sanitize_schema_for_parquet function
to make sure that the column names don't contain any special characters that would break the
output parquet file
"""
parsed_output.write_dataframe(sanitize_schema_for_parquet(df))
以下を置き換えます。
${namespace}
を ユーザーの ネームスペースに置き換えます。${yourName}
を ユーザーの Tutorial Practice Artifacts フォルダー名に置き換えます。${passenger_flight_alerts_raw_RID}
を、passenger_flight_alerts_csv_raw
データセットの RID に置き換えます。これは、passenger_flight_alerts_raw.py
で定義された出力です。前のタスクでパスを RID に置き換えた場合、そのファイルから RID をコピーしてこのファイルに貼り付けることができます。右上の Preview ボタンを使用して、コードをテストし、出力がデータセットとして表示されることを確認します。プレビューウィンドウで Configure input files と表示された場合は、Configure...
をクリックし、次の画面で passenger_flight_alerts.csv
の横にあるボックスにチェックを入れます。
テストが期待通りに機能した場合は、意味のあるメッセージ(例:「feature: add passenger_flight_alerts_preprocessed」)を付けてコードをコミットします。
フィーチャーブランチで passenger_flight_alerts_preprocessed.py
コードをビルドし、出力データセットに乗客とフライトアラートの2列マッピングが含まれていることを確認します。