注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
ユーザーの分析チームが、各乗客の出身国ごとに、個人情報を削除するための追加のロジックも適用した flight_alerts_joined_passengers
データセットを作成するように依頼したとしましょう。例えば、乗客が英国出身の全ての行を flight_alerts_UK
という新しいデータセットに送信するなどです。これをプログラムで実現する一つの方法は、生成トランスフォームを使用することです。これは基本的に、ユーザーの入力をユーザーのロジックに通し、出力を生成するためのfor ループを使用します。生成トランスフォームが実行されるたびに、新しい乗客が入力に追加された場合や新しい乗客-国の組み合わせが導入された場合など、動的にデータセットを追加したり、新たに作成します。
Master
から新しいブランチを作成し、yourName/feature/generated_transform
と名付けます。
リポジトリの Files 内のユーザーの /output
フォルダーを右クリックし、新しいファイル flight_alerts_by_country.py
を追加します。
新しい Python トランスフォームファイルのデフォルトのコードを以下のコードブロックに置き換えます。
from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F
"""
Define a transform generator function that will create multiple output datasets
This function takes an array of strings (countries) as an input
"""
def transform_generator(countries):
# Initialize an empty array to store each individual transform function (one for each output dataset)
transforms = []
# Loop through the individual strings in the countries array
for country in countries:
# For each country, create an output dataset with the name flight_alerts_COUNTRY - scroll to the end of the output line to see formatting
@transform_df(
Output("/${namespace}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Transform Project: Flight Alerts by Country/data/output/flight_alerts_{country}".format(country=country)),
source_df=Input("${flight_alerts_joined_passengers_RID}"),
)
def filter_by_country(source_df, country=country):
"""
By including "country=country" in the scope of this function we can
capture the value of the country variable so that we can use it within
the code. In this case we will use it to filter the country column.
Note, we are using lowercase strings
"""
filtered_df = source_df.filter(F.lower(F.col('country')) == country)
# Strip columns that won't be needed here. For example, sensitive passenger information
filtered_df = filtered_df.select(
'alert_display_name',
'flight_id',
'passenger_id',
'flight_date',
F.col('priority').alias('alert_priority'),
F.col('status').alias('alert_status'),
F.col('comment').alias('alert_comment'),
F.col('assignee').alias('alert_assignee'),
F.col('flyer_status').alias('passenger_status'),
F.col('country').alias('passenger_country'),
)
# Return the filtered dataframe to complete our individual transform
return filtered_df
# Append the completed transform to our transforms array, then move onto the next item in the for loop
transforms.append(filter_by_country)
# Returns the array of transforms, ready to be run
return transforms
# Feed this list of countries into our transform_generator function we defined above, then run each one
TRANSFORMS = transform_generator([
'brazil',
'canada',
'france',
'germany',
'mexico',
'netherlands',
'uk',
'us',
])
コード内の以下の行を置き換えます:
${namespace}
をユーザーの名前空間で置き換えます${yourName}
をユーザーの /Tutorial Practice Artifacts
フォルダー名で置き換えます${flight_alerts_joined_passengers_RID}
を前のタスクからの 変換 出力の RID で置き換えます(これはユーザーのリポジトリの flight_alerts_joined_passengers.py
ファイルにあります)。Preview ボタンをクリックします。コードの構造により、ユーザーは8つの可能な出力の中から一つのプレビューを選択するように求められます(transform_generator
によりコードの57行目で定義されている8つの国があります)。ドロップダウンで任意の filter_by_country
値を選択し、プレビューを実行します。
プレビュー結果が passenger_country
の値が選択した filter_by_country
値と等しいレコードだけを含むことを確認します。例えば、 filter_by_country (2)
を選択した場合、結果はコードの57行目にある transform_generator
の二番目の値、すなわち canada
に対応します。
ℹ️ 前のステップでは、ユーザーの 入力 が存在しないブランチでコードをプレビューしました — yourName/feature/generated_transform
。Fallback branches の概念を通じて、Foundry のビルドプロセス(およびプレビューオプション)は、現在のブランチに対応するブランチが見つからない場合、入力の Master
ブランチに "fallback" します。ユーザーはリポジトリの Settings → Branches → Fallback Branches で連続する fallback branch 挙動を定義することもできます。fallback branches についての詳細は こちら をご覧ください。
意味のあるメッセージ(例: "feature: add generated output")でコードをコミットします。
ブランチ上でコードをビルドし、各国ごとに8つの別々のデータセットがユーザーの .../Transform Project: Flight Alert Metrics/datasets/output/
フォルダーに作成されることを確認します。
ビルドが成功したら、PRプロセスを完了し、ブランチを Master
にマージします(必要に応じてマージ後にブランチを削除できます)。
Master
ブランチ上でコードをビルドします。
トランスフォームの生成は高度なトピックであり、追加の文脈を得るために 関連するドキュメンテーション を読み進めることを強くお勧めします。