注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
learn.palantir.com でも以下の内容をご覧いただけますが、アクセシビリティの観点から、ここに掲載しています。
前のチュートリアルでカバーされたプロセスをいくつかのマイナーな変更を加えながら、1) このクリーニングステップのコードを作成し、2) 出力をプレビューし、3) ブランチにビルドします。クリーニングステップの主な機能は、マッピングファイルをフライトアラートデータセットに結合して、数値の優先度とステータス値を文字列の同等物に置き換えるために使用できるようにすることです。つまり、priority
値が Low
、Medium
、High
である flight_alerts
データセットを 3
、2
、1
の代わりに作成したいと考えています。したがって、PySpark 変換では、複数の入力を使用することに関連する独自の構造が必要になります。
flight_alerts_clean.py
ファイルを開きます。#
)を削除して、pyspark.sqlから functions
モジュールをインポートできるようにします。source_df
を別名として使用するのでは十分ではありません。代わりに flight_alerts
と入力します。flight_alerts_preprocessed
の RID に置き換えます。これは、flight_alerts_preprocessed.py
ファイルで定義された Output から取得できます。priority_mapping
および status_mapping
入力用の2つの新しい、カンマで区切られた行を作成し、それぞれ priority_mapping_preprocessed
および status_mapping_preprocessed
からの RID を提供します。以下のサンプルに示すようにします。
この例の Output パスと RID は、説明のために切り捨てられています。リストの最後の入力の後にカンマを残しておくことを忘れないでください。Copied!1 2 3 4 5 6
@transform_df( Output("../Temporary Training Artifacts/..."), # 出力先の一時トレーニングアーティファクト flight_alerts=Input("ri.foundry.main.dataset..."), # フライトアラートの入力データセット priority_mapping=Input("ri.foundry.main.dataset..."), # プライオリティマッピングの入力データセット status_mapping=Input("ri.foundry.main.dataset..."), # ステータスマッピングの入力データセット )
@transform_df
デコレータの閉じ括弧 )
の後のすべて)を以下のコードブロックに置き換えます。
次のタスクでコードロジックの簡単なレビューを行います。コードブロック
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
def compute(flight_alerts, priority_mapping, status_mapping): # priority_mappingデータフレームを結合のために準備 priority_mapping = priority_mapping.select( F.col('value').alias('priority'), # 'value'列を'priority'としてエイリアス F.col('mapped_value').alias('priority_value') # 'mapped_value'列を'priority_value'としてエイリアス ) # status_mappingデータフレームを結合のために準備 status_mapping = status_mapping.select( F.col('value').alias('status'), # 'value'列を'status'としてエイリアス F.col('mapped_value').alias('status_value') # 'mapped_value'列を'status_value'としてエイリアス ) # flight_alertsとpriority_mappingおよびstatus_mappingを結合して、優先度とステータスの人間に読みやすい名前を取得 df = flight_alerts.join(priority_mapping, on='priority', how='left') # priority列でleft join df = df.join(status_mapping, on='status', how='left') # status列でleft join # 結合後のカラムを選択 df = df.select( 'alert_display_name', 'flight_id', 'flight_display_name', 'flight_date', 'rule_id', 'rule_name', 'category', F.col('priority_value').alias('priority'), # 'priority_value'列を'priority'としてエイリアス F.col('status_value').alias('status') # 'status_value'列を'status'としてエイリアス ) # 今後のワークフローでコメントやユーザー名を保存するための空のプレースホルダカラムを追加 # Noneは型がないため、必要なキャストを行う df = df.withColumn('comment', F.lit(None).cast('string')) # 'comment'列を追加し、型をstringにキャスト df = df.withColumn('assignee', F.lit(None).cast('string')) # 'assignee'列を追加し、型をstringにキャスト return df # 最終的なデータフレームを返す