5A. [Repositories] Code Repositories での生ファイルの取り扱い7 - データ前処理 パート1

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

7 - データ前処理 パート1

📖 タスクの概要

Datasource Project: Flight Alerts プロジェクトのデータセットと同様に、passengers と passenger_flight_alerts(乗客をフライトアラートにマッピングする)データセットにも前処理ステップが必要です。予想通り、最初のタスクはデータセットファイルを読み込み、Spark DataFrames にパースして、最適化された Parquet 形式に書き込むことです。

Parquet は、列名に特殊文字やスペースを許可していないため、前処理ステップでは列名のサニタイズも行う必要があります。Foundry は、transforms.verbs.dataframes パッケージの sanitize_schema_for_parquet など、コードベースの変換を効率化するための API やパッケージを提供しています。これにより、DataFrame の列名から Parquet ファイルとして保存できない文字が削除され、データセットのビルドが正常に行われるようになります。

🔨 タスクの説明

  1. まず、Master から yourName/feature/tutorial_preprocessed_files という名前の新しいブランチを作成します。

  2. コードリポジトリの Files セクションで /datasets フォルダーを右クリックし、新しいフォルダー /preprocessed を作成します。

  3. /preprocessed フォルダーに passenger_flight_alerts_preprocessed.py という名前の新しい Python ファイルを作成します。

  4. passenger_flight_alerts_preprocessed.py ファイルを開き、デフォルトの内容を以下のコードブロックに置き換えます。

    from transforms.api import transform, Input, Output
    from transforms.verbs.dataframes import sanitize_schema_for_parquet
    
    
    @transform(
        parsed_output=Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/datasets/preprocessed/passenger_flight_alerts_preprocessed"),
        raw_file_input=Input("${passenger_flight_alerts_csv_raw_RID}"),
    )
    def read_csv(ctx, parsed_output, raw_file_input):
    
        # Create a variable for the filesystem of the input datasets
        filesystem = raw_file_input.filesystem()
    
        # Create a variable for the hadoop path of the files in the input dataset
        hadoop_path = filesystem.hadoop_path
    
        # Create an array of the absolute path of each file in the input dataset
        paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    
        # Create a Spark dataframe from all of the CSV files in the input dataset
        df = (
            ctx
            .spark_session
            .read
            .option("encoding", "UTF-8")  # UTF-8 is the default
            .option("header", True)
            .option("inferSchema", True)
            .csv(paths)
        )
    
        """
        Write the dataframe to the output dataset using the sanitize_schema_for_parquet function
        to make sure that the column names don't contain any special characters that would break the
        output parquet file
        """
        parsed_output.write_dataframe(sanitize_schema_for_parquet(df))
    
  5. 以下を置き換えます。

    • 6行目の ${namespace}ユーザーの ネームスペースに置き換えます。
    • 6行目の ${yourName}ユーザーの Tutorial Practice Artifacts フォルダー名に置き換えます。
    • 7行目の ${passenger_flight_alerts_raw_RID} を、passenger_flight_alerts_csv_raw データセットの RID に置き換えます。これは、passenger_flight_alerts_raw.py で定義された出力です。前のタスクでパスを RID に置き換えた場合、そのファイルから RID をコピーしてこのファイルに貼り付けることができます。
    • それ以外の場合は、以下に示すワークフローを使用して、Foundry Explorer ヘルパーから RID を取得できます。

  6. 右上の Preview ボタンを使用して、コードをテストし、出力がデータセットとして表示されることを確認します。プレビューウィンドウで Configure input files と表示された場合は、Configure... をクリックし、次の画面で passenger_flight_alerts.csv の横にあるボックスにチェックを入れます。

  7. テストが期待通りに機能した場合は、意味のあるメッセージ(例:「feature: add passenger_flight_alerts_preprocessed」)を付けてコードをコミットします。

  8. フィーチャーブランチで passenger_flight_alerts_preprocessed.py コードをビルドし、出力データセットに乗客とフライトアラートの2列マッピングが含まれていることを確認します。