注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
いくつかの小さな違いはありますが、今回は前のチュートリアルでカバーしたプロセスを用いて、1) このクリーニングステップのコードを作成し、2) 出力をプレビューし、3) ユーザーのブランチでビルドします。クリーニングステップの主要な特徴は、マッピングファイルをフライトアラートデータセットに結合し、数値の優先度とステータス値を文字列に置き換えるために使用できるようにすることです。つまり、3
、2
、1
の代わりに Low
、Medium
、High
の priority
値を持つ豊かな flight_alerts
データセットを求めています。そのため、PySpark の変換は複数の入力を使用するという特異な構造を持つことになります。
flight_alerts_clean.py
ファイルを開きます。#
) を削除して、ユーザーのコードが pyspark.sql の functions
モジュールをインポートするようにします。source_df
をエイリアスとして使用するのではなく、代わりに flight_alerts
を入力します。flight_alerts_preprocessed
の RID に置き換えます。これは、ユーザーの flight_alerts_preprocessed.py
ファイルで定義された Output から取得できます。priority_mapping
と status_mapping
の入力を示す2つの新しい、カンマで区切られた行を作成します。それぞれには priority_mapping_preprocessed
と status_mapping_preprocessed
から取得した RID を供給します。以下のサンプルを参照してください。
この例では、イラストレーションの目的のために、Output パスと RID が切り捨てられています。最後の入力の後にもカンマを残すことを忘れないでください。@transform_df(
Output("../Temporary Training Artifacts/..."), # 出力パスを指定します
flight_alerts=Input("ri.foundry.main.dataset..."), # flight_alertsという名前のデータセットを入力として指定します
priority_mapping=Input("ri.foundry.main.dataset..."), # priority_mappingという名前のデータセットを入力として指定します
status_mapping=Input("ri.foundry.main.dataset..."), # status_mappingという名前のデータセットを入力として指定します
)
@transform_df
デコレータの閉じ括弧 )
の後に続く関数定義(つまり、全て)を以下のコードブロックに置き換えてください。コードのロジックについては次のタスクで簡単にレビューします。コードブロック
def compute(flight_alerts, priority_mapping, status_mapping):
# priority_mapping データフレームを結合用に準備します
priority_mapping = priority_mapping.select(
F.col('value').alias('priority'),
F.col('mapped_value').alias('priority_value')
)
# status_mapping データフレームを結合用に準備します
status_mapping = status_mapping.select(
F.col('value').alias('status'),
F.col('mapped_value').alias('status_value')
)
# flight_alerts を priority_mapping および status_mapping と結合し、priority と status の人間が読める名前を取得します
df = flight_alerts.join(priority_mapping, on='priority', how='left')
df = df.join(status_mapping, on='status', how='left')
# 結合後の列を選択します
df = df.select(
'alert_display_name',
'flight_id',
'flight_display_name',
'flight_date',
'rule_id',
'rule_name',
'category',
F.col('priority_value').alias('priority'),
F.col('status_value').alias('status'),
)
# 未来のワークフローでコメントとユーザ名を保存するための空のプレースホルダ列を追加します
# `None` は型がないため、キャストが必要です
df = df.withColumn('comment', F.lit(None).cast('string'))
df = df.withColumn('assignee', F.lit(None).cast('string'))
return df