This content is also available at learn.palantir.com ↗ and is presented here for accessibility purposes.
For training purposes, you’ll use some basic PySpark code to create a copy of the passenger_flight_alerts_csv_raw
and passengers_json_raw
datasets in your datasource project folder as you did in the Introduction to Data Transformation tutorial with the raw flight alerts and mapping datasets. The result is a dedicated starting point for the passengers datasource segment of your pipeline, though typically you'd simply ingest the source file(s) with a Data Connection sync and use the synchronized dataset as your input.
Because you’re working with raw files, a simple PySpark identity transform won’t suffice. You’ll need to implement some distinctive code that relies on the @transform()
decorator and unique Python libraries.
⚠️ As you will be producing outputs that are not in the Foundry Training and Resources project where the data sources live, you’ll need to make the necessary adjustments to the datasource paths in the transform files and make the Project References using the process illustrated here when prompted.
Expand the folder structure in the Files section of your code repository near the top-left of your screen, right-click on the /datasets
folder under /src
and create a new folder within it called /raw
.
Create two new Python files in your new /raw
folder:
passengers_raw.py
passenger_flight_alerts_raw.py
Open the passenger_flight_alerts_raw.py
file and make note of the case sensitive space at the root of the output file path on line 6. For example: Output("/thisIsTheSpace/Foundry Training and Resources/...
Replace all default code in the editor with the code block below. Note that you still need to adapt the Input and Output paths to match the training project path and your tutorial artifacts folder, respectively.
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${space}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passenger_flight_alerts_raw"),
raw_file_input=Input("/${space}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passenger_flight_alerts_csv_raw"),
)
def compute(output, raw_file_input):
"""
Create variables for the filesystems of the input and output datasets
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
This function takes a row of file metadata as input
This function copies the file from the input dataset to the output dataset
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
In this dataframe, each row represents a single raw file from the input dataset
"""
files_df = input_filesystem.files()
"""
Runs the copy_file_without_doing_anything in parallel on Spark
This code will scale well because it leverages Spark rather than running on the driver
"""
files_df.foreach(copy_file_without_doing_anything)
Once you have copied in the code, replace the ${space}
on lines 6 and 7 with the space noted in step 4.
Replace ${yourName}
on line 6 with the name of your /Tutorial Practice Artifacts
folder (for example, .../Foundry Reference Project/Tutorial Practice Artifacts/jmeier/...
).
Repeat steps 3-6 in the passengers_raw.py
file, replacing the default code with the code block below.
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${space}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passengers_raw"),
raw_file_input=Input("/${space}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passengers_json_raw"),
)
def compute(output, raw_file_input):
"""
Create variables for the filesystems of the input and output datasets
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
This function takes a row of file metadata as input
This function copies the file from the input dataset to the output dataset
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
Create a dataframe containing paths and other file metadata for everything in the input dataset filesystem
In this dataframe, each row represents a single raw file from the input dataset
"""
files_df = input_filesystem.files()
"""
Runs the copy_file_without_doing_anything in parallel on Spark
This code will scale well because it leverages Spark rather than running on the driver
"""
files_df.foreach(copy_file_without_doing_anything)
ℹ️ This tutorial will not review the details of the code you just copied in, but you are welcome to read through the code comments and refer to the documentation on Python raw file access as desired.
Preview your output by clicking the Preview button at the top right of your screen. If prompted in the preview window to Configure input files, click Configure...
and tick the box next to passenger_flight_alerts.csv
in the next screen.
Then, click the blue Save and preview button to the right. The output of your preview won’t be a series of rows and columns; it will simply affirm that what’s being generated is a copy of the raw CSV file.
Commit your code with a meaningful message (e.g., feature: add raw transform files).
Build both datasets on your feature branch.
Once the builds complete, consider swapping out the dataset paths for the RIDs in both transform files using the process described in a previous tutorial.
Merge your code into the Master
branch using the PR process described in previous tutorials.
Build both raw outputs on the Master
branch.