ドキュメントの検索
karat

+

K

APIリファレンス ↗

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

3 - ユーザーのコードの作成、プレビュー、ビルド

📖 タスクの概要

いくつかの小さな違いはありますが、今回は前のチュートリアルでカバーしたプロセスを用いて、1) このクリーニングステップのコードを作成し、2) 出力をプレビューし、3) ユーザーのブランチでビルドします。クリーニングステップの主要な特徴は、マッピングファイルをフライトアラートデータセットに結合し、数値の優先度とステータス値を文字列に置き換えるために使用できるようにすることです。つまり、321 の代わりに LowMediumHighpriority 値を持つ豊かな flight_alerts データセットを求めています。そのため、PySpark の変換は複数の入力を使用するという特異な構造を持つことになります。

🔨 タスクの説明

  1. 新しく作成した flight_alerts_clean.py ファイルを開きます。
  2. 1行目からコメント文字 (#) を削除して、ユーザーのコードが pyspark.sqlfunctions モジュールをインポートするようにします。
  3. 追加の入力を追加するため、7行目で source_df をエイリアスとして使用するのではなく、代わりに flight_alerts を入力します。
  4. 7行目の SOURCE_DATASET_PATHflight_alerts_preprocessedRID に置き換えます。これは、ユーザーの flight_alerts_preprocessed.py ファイルで定義された Output から取得できます。
  5. フライトアラートの入力行のすぐ下に、priority_mappingstatus_mapping の入力を示す2つの新しい、カンマで区切られた行を作成します。それぞれには priority_mapping_preprocessedstatus_mapping_preprocessed から取得した RID を供給します。以下のサンプルを参照してください。 この例では、イラストレーションの目的のために、Output パスと RID が切り捨てられています。最後の入力の後にもカンマを残すことを忘れないでください。
@transform_df(
    Output("../Temporary Training Artifacts/..."),  # 出力パスを指定します
    flight_alerts=Input("ri.foundry.main.dataset..."),  # flight_alertsという名前のデータセットを入力として指定します
    priority_mapping=Input("ri.foundry.main.dataset..."),  # priority_mappingという名前のデータセットを入力として指定します
    status_mapping=Input("ri.foundry.main.dataset..."),  # status_mappingという名前のデータセットを入力として指定します
)  
  1. @transform_df デコレータの閉じ括弧 ) の後に続く関数定義(つまり、全て)を以下のコードブロックに置き換えてください。コードのロジックについては次のタスクで簡単にレビューします。
  2. 変換コードが正しく動作することを確認するために、プレビューを実行します。プレビューが完了すると、下部のプレビューヘルパーの左側にユーザーの三つの入力が表示されます。
  3. プレビュー中に発生した可能性のある問題(例えば、間違ったコピーペーストが原因で生じたスペースの誤りや軽微な構文エラー)を修正し、画面の右上部にあるコミットボタンをクリックします。コミットメッセージには、「feature: add clean dataset」のような意味のあるメッセージを考えてみてください。 ℹ️ 未コミットの変更がある状態でビルドをクリックすると、ユーザーのリポジトリは自動的にユーザーのブランチにコードをコミットし、一般的なコミットメッセージが生成されます。より役立つメッセージを生成するために、ビルドの前に明示的にコミットを使用することをお勧めします。
  4. ビルドボタンをクリックして、ユーザーのブランチ上でユーザーのコードを実行し、ユーザーのデータセットをビルドします。CIチェックとビルドプロセスには数分かかるので、その間に次のタスクに進んでください。

コードブロック


def compute(flight_alerts, priority_mapping, status_mapping):

    # priority_mapping データフレームを結合用に準備します
    priority_mapping = priority_mapping.select(
        F.col('value').alias('priority'),
        F.col('mapped_value').alias('priority_value')
    )

    # status_mapping データフレームを結合用に準備します
    status_mapping = status_mapping.select(
        F.col('value').alias('status'),
        F.col('mapped_value').alias('status_value')
    )

    # flight_alerts を priority_mapping および status_mapping と結合し、priority と status の人間が読める名前を取得します
    df = flight_alerts.join(priority_mapping, on='priority', how='left')
    df = df.join(status_mapping, on='status', how='left')

    # 結合後の列を選択します
    df = df.select(
        'alert_display_name',
        'flight_id',
        'flight_display_name',
        'flight_date',
        'rule_id',
        'rule_name',
        'category',
        F.col('priority_value').alias('priority'),
        F.col('status_value').alias('status'),
    )

    # 未来のワークフローでコメントとユーザ名を保存するための空のプレースホルダ列を追加します
    # `None` は型がないため、キャストが必要です
    df = df.withColumn('comment', F.lit(None).cast('string'))
    df = df.withColumn('assignee', F.lit(None).cast('string'))

    return df