5A. [Repositories] Code Repositories での生ファイルの取り扱い5 - ユーザーのプロジェクトに生データソースをコピーする

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

5 - ユーザーのプロジェクトに生データソースをコピーする

トレーニングのため、基本的な PySpark コードを使用して、passenger_flight_alerts_csv_rawpassengers_json_raw データセットのコピーをユーザーのデータソースプロジェクトフォルダーに作成します。これは、データ変換の導入 チュートリアルで生のフライトアラートとマッピングデータセットと同じように行います。結果として、ユーザーのパイプラインの乗客データソースセグメントの専用の開始点が得られますが、通常はデータ接続同期でソースファイルを単純に取り込み、同期したデータセットをユーザーの入力として使用します。

生のファイルを扱っているため、単純な PySpark アイデンティティ変換では十分ではありません。@transform() デコレーターや独自の Python ライブラリに依存する特異なコードを実装する必要があります。

⚠️ データソースが存在する Foundry Training and Resources プロジェクトではなく、出力を生成するため、変換ファイル内のデータソースパスを必要に応じて調整し、ここで説明されているプロセスを使用して プロジェクト参照 を作成する必要があります。

🔨 タスクの説明

  1. ユーザーの画面の左上にあるコードリポジトリの ファイル セクションでフォルダー構造を展開し、/src の下の /datasets フォルダーを右クリックして、その中に /raw という新しいフォルダーを作成します。

  2. 新しく作成した /raw フォルダーに次の2つの Python ファイルを作成します:

    • passengers_raw.py
    • passenger_flight_alerts_raw.py
  3. passenger_flight_alerts_raw.py ファイルを開き、出力ファイルパスのルートにある大文字と小文字を区別する ネームスペース をメモします。例: Output("/thisIsTheNamespace/Foundry Training and Resources/...

  4. エディター内の すべて のデフォルトコードを下記のコードブロックで置き換えます。なお、入力パスと出力パスをトレーニングプロジェクトのパスとユーザーのチュートリアル作成物フォルダーにそれぞれ合わせる必要があることに注意してください。

    
    from transforms.api import transform, Input, Output
    from shutil import copyfileobj
    
    
    @transform(
        output=Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passenger_flight_alerts_raw"),
        raw_file_input=Input("/${namespace}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passenger_flight_alerts_csv_raw"),
    )
    def compute(output, raw_file_input):
        """
        Create variables for the filesystems of the input and output datasets
        """
        input_filesystem = raw_file_input.filesystem()
        output_filesystem = output.filesystem()
    
        """
        This function takes a row of file metadata as input
        This function copies the file from the input dataset to the output dataset
        """
        def copy_file_without_doing_anything(files_df_row):
            with input_filesystem.open(files_df_row.path, 'rb') as f:
                with output_filesystem.open(files_df_row.path, 'wb') as o:
                    copyfileobj(f, o)
    
        """
        Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
        In this dataframe, each row represents a single raw file from the input dataset
        """
        files_df = input_filesystem.files()
    
        """
        Runs the copy_file_without_doing_anything in parallel on Spark
        This code will scale well because it leverages Spark rather than running on the driver
        """
        files_df.foreach(copy_file_without_doing_anything)
    
  5. コードをコピーしたら、ステップ4で記録したネームスペースに合わせて、6行目と7行目の ${namespace} を置き換えます。

  6. 6行目の ${yourName}ユーザーの /Tutorial Practice Artifacts フォルダーの名前に置き換えます(例:.../Foundry Reference Project/Tutorial Practice Artifacts/jmeier/...)。

  7. passengers_raw.py ファイルでステップ3-6を繰り返し、デフォルトのコードを下記のコードブロックに置き換えます。

    
    from transforms.api import transform, Input, Output
    from shutil import copyfileobj
    
    
    @transform(
        output=Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passengers_raw"),
        raw_file_input=Input("/${namespace}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passengers_json_raw"),
    )
    def compute(output, raw_file_input):
        """
        Create variables for the filesystems of the input and output datasets
        """
        input_filesystem = raw_file_input.filesystem()
        output_filesystem = output.filesystem()
    
        """
        This function takes a row of file metadata as input
        This function copies the file from the input dataset to the output dataset
        """
        def copy_file_without_doing_anything(files_df_row):
            with input_filesystem.open(files_df_row.path, 'rb') as f:
                with output_filesystem.open(files_df_row.path, 'wb') as o:
                    copyfileobj(f, o)
    
        """
        Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
        In this dataframe, each row represents a single raw file from the input dataset
        """
        files_df = input_filesystem.files()
    
        """
        Runs the copy_file_without_doing_anything in parallel on Spark
        This code will scale well because it leverages Spark rather than running on the driver
        """
        files_df.foreach(copy_file_without_doing_anything)
    
    

ℹ️ このチュートリアルでは、コピーしたコードの詳細は説明しませんが、コードのコメントを読んだり、Python の生ファイルアクセスに関するドキュメンテーションを参照したりすることは自由です。

  1. ユーザーの画面の右上にある プレビュー ボタンをクリックして出力をプレビューします。プレビューウィンドウで 入力ファイルの設定 を求められた場合は、設定... をクリックし、次の画面で passenger_flight_alerts.csv の隣にあるボックスをチェックします。

    • ℹ️ プレビュー機能が自動的に動作しない場合は、Code Assist を再読み込みするか、ブラウザを再読み込みしてください。
  2. 次に、右側の青い 保存してプレビュー ボタンをクリックします。プレビューの出力は、行と列の一連のものではなく、単に生の CSV ファイルのコピーが生成されていることを確認するだけです。

  3. 意味のあるメッセージ(例:feature: add raw transform files)でコードをコミットします。

  4. フィーチャーブランチ上で両方のデータセットをビルドします。

  5. ビルドが完了したら、両方の変換ファイルのデータセットパスを、前のチュートリアルで説明されているプロセスを使用して RID に置き換えることを検討してみてください。

    • コードエディターで パスを RID に置き換える インジケーターが表示される前に、ブラウザを再読み込みする必要があること(a)と、コードを再コミットする必要があること(b)を覚えておいてください。
  6. 前のチュートリアルで説明されている PR プロセスを使用して、ユーザーのコードを Master ブランチにマージします。

  7. Master ブランチで両方の生出力をビルドします。