3. [Repositories] Creating a Project Output3. Creating Previewing And Building Your Code

3 - Creating, Previewing, and Building your Code

This content is also available at learn.palantir.com ↗ and is presented here for accessibility purposes.

📖 Task Introduction

With some minor variation, you’ll now use processes covered in the previous tutorial to 1) create the code for this cleaning step, 2) preview the output, and 3) build it on your branch. A major feature of the cleaning step will be joining the mapping files to your flight alerts dataset so that they can be used to replace the numeric priority and status values with their string equivalents. In other words, we want an enriched flight_alerts dataset with priority values of Low, Medium, and High instead of 3, 2, and 1. The PySpark transform will therefore have a distinct structure that involves the use of multiple inputs.

🔨 Task Instructions

  1. Open your new flight_alerts_clean.py file.
  2. Remove the comment character (#) from line 1 so that your code imports the functions module from pyspark.sql.
  3. You’re going to be adding additional inputs so using source_df (on line 7) as an alias will no longer suffice. Type flight_alerts in its place.
  4. Replace SOURCE_DATASET_PATH on line 7 with the RID of flight_alerts_preprocessed, which you can obtain from the Output defined in your flight_alerts_preprocessed.py file.
  5. Just below your flight alerts input line, create two new, comma-separated lines for priority_mapping and status_mapping inputs, supplying the RIDs from priority_mapping_preprocessed and status_mapping_preprocessed respectively as show in the sample below. Note the Output paths and RIDs in this example are truncated for illustration purposes. Remember to leave a comma after the final input in your list.
@transform_df(
    Output("../Temporary Training Artifacts/..."),
    flight_alerts=Input("ri.foundry.main.dataset..."),
    priority_mapping=Input("ri.foundry.main.dataset..."),
    status_mapping=Input("ri.foundry.main.dataset..."),
)  
  1. Replace the function definition (i.e., everything after the closing ) of the @transform_df decorator) with the code block below. We'll perform a brief review of the code logic in the next task.
  2. Run the Preview to ensure the transform code functions properly. When the preview completes, notice that the left-hand side of the Preview helper at the bottom lists your three inputs.
  3. Fix any issues (e.g., spacing or minor syntax errors that may reflect incorrect copy/paste) that may have been surfaced during the preview, and then click the Commit button in the top right of the screen. Consider a meaningful commit message like, “feature: add clean dataset.” ℹ️ If you have uncommitted changes and decide to click Build before Commit, your repository will automatically commit your code to your branch with a generic commit message. We recommend using Commit expressly before Build so you can generate more helpful messages.
  4. Click the Build button to execute your code and built your dataset on your branch. CI checks and the build process will take a few minutes, so move to the next task in the meantime.

Code Block


def compute(flight_alerts, priority_mapping, status_mapping):

    # prepare priority_mapping dataframe for joining
    priority_mapping = priority_mapping.select(
        F.col('value').alias('priority'),
        F.col('mapped_value').alias('priority_value')
    )

    # prepare status_mapping dataframe for joining
    status_mapping = status_mapping.select(
        F.col('value').alias('status'),
        F.col('mapped_value').alias('status_value')
    )

    # join flight_alerts to priority_mapping and status_mapping to get human readable names for priority and status
    df = flight_alerts.join(priority_mapping, on='priority', how='left')
    df = df.join(status_mapping, on='status', how='left')

    # select columns after join
    df = df.select(
        'alert_display_name',
        'flight_id',
        'flight_display_name',
        'flight_date',
        'rule_id',
        'rule_name',
        'category',
        F.col('priority_value').alias('priority'),
        F.col('status_value').alias('status'),
    )

    # add empty placeholder columns for storing comments and usernames in future workflows
    # note necessary cast since `None` is typeless
    df = df.withColumn('comment', F.lit(None).cast('string'))
    df = df.withColumn('assignee', F.lit(None).cast('string'))

    return df