注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

3 - コードの作成、プレビュー、およびビルド

learn.palantir.com でも以下の内容をご覧いただけますが、アクセシビリティの観点から、ここに掲載しています。

📖 タスクの概要

前のチュートリアルでカバーされたプロセスをいくつかのマイナーな変更を加えながら、1) このクリーニングステップのコードを作成し、2) 出力をプレビューし、3) ブランチにビルドします。クリーニングステップの主な機能は、マッピングファイルをフライトアラートデータセットに結合して、数値の優先度とステータス値を文字列の同等物に置き換えるために使用できるようにすることです。つまり、priority 値が LowMediumHigh である flight_alerts データセットを 321 の代わりに作成したいと考えています。したがって、PySpark 変換では、複数の入力を使用することに関連する独自の構造が必要になります。

🔨 タスクの説明

  1. 新しい flight_alerts_clean.py ファイルを開きます。
  2. 1行目のコメント文字(#)を削除して、pyspark.sqlから functions モジュールをインポートできるようにします。
  3. 追加の入力を追加するため、7行目の source_df を別名として使用するのでは十分ではありません。代わりに flight_alerts と入力します。
  4. 7行目の SOURCE_DATASET_PATH を、flight_alerts_preprocessedRID に置き換えます。これは、flight_alerts_preprocessed.py ファイルで定義された Output から取得できます。
  5. フライトアラート入力行のすぐ下に、priority_mapping および status_mapping 入力用の2つの新しい、カンマで区切られた行を作成し、それぞれ priority_mapping_preprocessed および status_mapping_preprocessed からの RID を提供します。以下のサンプルに示すようにします。 この例の Output パスと RID は、説明のために切り捨てられています。リストの最後の入力の後にカンマを残しておくことを忘れないでください。
Copied!
1 2 3 4 5 6 @transform_df( Output("../Temporary Training Artifacts/..."), # 出力先の一時トレーニングアーティファクト flight_alerts=Input("ri.foundry.main.dataset..."), # フライトアラートの入力データセット priority_mapping=Input("ri.foundry.main.dataset..."), # プライオリティマッピングの入力データセット status_mapping=Input("ri.foundry.main.dataset..."), # ステータスマッピングの入力データセット )
  1. 関数定義(つまり、@transform_df デコレータの閉じ括弧 ) の後のすべて)を以下のコードブロックに置き換えます。 次のタスクでコードロジックの簡単なレビューを行います。
  2. Preview を実行してトランスフォームコードが正しく機能するか確認します。プレビューが完了したら、画面下部のPreviewヘルパーの左側に 3 個の入力がリストされていることに注目してください。
  3. プレビュー中に表示された問題(たとえば、スペーシングやコピー/ペーストの間違いによる軽微な構文エラーなど)を修正し、画面右上の Commit ボタンをクリックします。「feature: add clean dataset」のような意味のあるコミットメッセージを考えてください。 ℹ️ 未コミットの変更があり、Commit の前に Build をクリックすると、リポジトリは自動的にユーザーのブランチにコードを汎用的なコミットメッセージでコミットします。より役立つメッセージを生成できるように、明示的に Build の前に Commit を使用することをお勧めします。
  4. Build ボタンをクリックしてコードを実行し、ブランチ上にデータセットを構築します。CI チェックとビルドプロセスには数分かかるため、その間に次のタスクに進みます。

コードブロック

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def compute(flight_alerts, priority_mapping, status_mapping): # priority_mappingデータフレームを結合のために準備 priority_mapping = priority_mapping.select( F.col('value').alias('priority'), # 'value'列を'priority'としてエイリアス F.col('mapped_value').alias('priority_value') # 'mapped_value'列を'priority_value'としてエイリアス ) # status_mappingデータフレームを結合のために準備 status_mapping = status_mapping.select( F.col('value').alias('status'), # 'value'列を'status'としてエイリアス F.col('mapped_value').alias('status_value') # 'mapped_value'列を'status_value'としてエイリアス ) # flight_alertsとpriority_mappingおよびstatus_mappingを結合して、優先度とステータスの人間に読みやすい名前を取得 df = flight_alerts.join(priority_mapping, on='priority', how='left') # priority列でleft join df = df.join(status_mapping, on='status', how='left') # status列でleft join # 結合後のカラムを選択 df = df.select( 'alert_display_name', 'flight_id', 'flight_display_name', 'flight_date', 'rule_id', 'rule_name', 'category', F.col('priority_value').alias('priority'), # 'priority_value'列を'priority'としてエイリアス F.col('status_value').alias('status') # 'status_value'列を'status'としてエイリアス ) # 今後のワークフローでコメントやユーザー名を保存するための空のプレースホルダカラムを追加 # Noneは型がないため、必要なキャストを行う df = df.withColumn('comment', F.lit(None).cast('string')) # 'comment'列を追加し、型をstringにキャスト df = df.withColumn('assignee', F.lit(None).cast('string')) # 'assignee'列を追加し、型をstringにキャスト return df # 最終的なデータフレームを返す