注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
この内容は learn.palantir.com ↗ でもご覧いただけますが、アクセシビリティの観点から、ここに掲載しています。
ユーザーの Datasource Project: Flight Alerts プロジェクトのデータセットと同様に、乗客と乗客フライトアラート(乗客とフライトアラートをマッピング)データセットにも前処理ステップが必要になります。想像できるように、最初のステップはデータセットファイルを読み込み、Spark DataFrame にパースして、最適化された Parquet 形式に書き出すことです。
Parquet では、列名に特殊文字やスペースを許可しないため、前処理ステップでは列名のサニタイズも必要になります。Foundry が提供する API やパッケージの中には、transforms.verbs.dataframes
パッケージの sanitize_schema_for_parquet
のように、コードベースのトランスフォームを効率化するものがあります。これは、DataFrame の列名から Parquet ファイルとして保存できない文字をすべて削除し、データセットのビルドが成功するようにします。
最初に、Master
から新しいブランチを作成し、yourName/feature/tutorial_preprocessed_files
と名前を付けます。
コードリポジトリの Files セクションにある /datasets
フォルダーを右クリックし、新しいフォルダー /preprocessed
を作成します。
/preprocessed
フォルダーに新しい Python ファイル passenger_flight_alerts_preprocessed.py
を作成します。
passenger_flight_alerts_preprocessed.py
ファイルを開き、下記のコードブロックでデフォルトの内容を置き換えます。
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
@transform(
parsed_output=Output("/${space}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/datasets/preprocessed/passenger_flight_alerts_preprocessed"),
raw_file_input=Input("${passenger_flight_alerts_csv_raw_RID}"),
)
def read_csv(ctx, parsed_output, raw_file_input):
# Create a variable for the filesystem of the input datasets
filesystem = raw_file_input.filesystem()
# Create a variable for the hadoop path of the files in the input dataset
hadoop_path = filesystem.hadoop_path
# Create an array of the absolute path of each file in the input dataset
paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
# Create a Spark dataframe from all of the CSV files in the input dataset
df = (
ctx
.spark_session
.read
.option("encoding", "UTF-8") # UTF-8 is the default
.option("header", True)
.option("inferSchema", True)
.csv(paths)
)
"""
Write the dataframe to the output dataset using the sanitize_schema_for_parquet function
to make sure that the column names don't contain any special characters that would break the
output parquet file
"""
parsed_output.write_dataframe(sanitize_schema_for_parquet(df))
以下の項目を置き換えます。
${space}
をユーザーの space に置き換えます。${yourName}
をユーザーの Tutorial Practice Artifacts フォルダー名に置き換えます。${passenger_flight_alerts_raw_RID}
を passenger_flight_alerts_csv_raw
データセットの RID に置き換えます。これは passenger_flight_alerts_raw.py
で定義された出力です。前のタスクでパスを RID に置き換えた場合は、そのファイルから RID をコピーしてこのファイルに貼り付けることができます。右上の Preview ボタンを使用して、コードをテストし、出力が raw ファイルではなくデータセットとして表示されることを確認します。プレビューウィンドウで Configure input files を求められた場合は、Configure...
をクリックし、次の画面で passenger_flight_alerts.csv
の隣のボックスにチェックを入れます。
テストが期待通りに動作した場合は、コードを「feature: add passenger_flight_alerts_preprocessed」など、意味のあるメッセージを付けてコミットします。
フィーチャーブランチで passenger_flight_alerts_preprocessed.py
コードをビルドし、出力データセットが乗客とフライトアラートの二列マッピングを含むことを確認します。