This content is also available at learn.palantir.com ↗ and is presented here for accessibility purposes.
With some minor variation, you’ll now use processes covered in the previous tutorial to 1) create the code for this cleaning step, 2) preview the output, and 3) build it on your branch. A major feature of the cleaning step will be joining the mapping files to your flight alerts dataset so that they can be used to replace the numeric priority and status values with their string equivalents. In other words, we want an enriched flight_alerts
dataset with priority
values of Low
, Medium
, and High
instead of 3
, 2
, and 1
. The PySpark transform will therefore have a distinct structure that involves the use of multiple inputs.
flight_alerts_clean.py
file.#
) from line 1 so that your code imports the functions
module from pyspark.sql.source_df
(on line 7) as an alias will no longer suffice. Type flight_alerts
in its place.flight_alerts_preprocessed
, which you can obtain from the Output defined in your flight_alerts_preprocessed.py
file.priority_mapping
and status_mapping
inputs, supplying the RIDs from priority_mapping_preprocessed
and status_mapping_preprocessed
respectively as show in the sample below.
Note the Output paths and RIDs in this example are truncated for illustration purposes. Remember to leave a comma after the final input in your list.Copied!1 2 3 4 5 6
@transform_df( Output("../Temporary Training Artifacts/..."), flight_alerts=Input("ri.foundry.main.dataset..."), priority_mapping=Input("ri.foundry.main.dataset..."), status_mapping=Input("ri.foundry.main.dataset..."), )
)
of the @transform_df
decorator) with the code block below.
We'll perform a brief review of the code logic in the next task.Code Block
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
def compute(flight_alerts, priority_mapping, status_mapping): # prepare priority_mapping dataframe for joining priority_mapping = priority_mapping.select( F.col('value').alias('priority'), F.col('mapped_value').alias('priority_value') ) # prepare status_mapping dataframe for joining status_mapping = status_mapping.select( F.col('value').alias('status'), F.col('mapped_value').alias('status_value') ) # join flight_alerts to priority_mapping and status_mapping to get human readable names for priority and status df = flight_alerts.join(priority_mapping, on='priority', how='left') df = df.join(status_mapping, on='status', how='left') # select columns after join df = df.select( 'alert_display_name', 'flight_id', 'flight_display_name', 'flight_date', 'rule_id', 'rule_name', 'category', F.col('priority_value').alias('priority'), F.col('status_value').alias('status'), ) # add empty placeholder columns for storing comments and usernames in future workflows # note necessary cast since `None` is typeless df = df.withColumn('comment', F.lit(None).cast('string')) df = df.withColumn('assignee', F.lit(None).cast('string')) return df