Lightweight transform examples

In addition to speed, lightweight transforms are versatile. Lightweight transforms do not make an assumption about the user's choice of compute engine. The following content showcases how various compute engines can be leveraged while relying on the Lightweight API and Transforms API. After which, we will demonstrate employing highly-custom non-Python data processing applications by using our own Docker image containing the necessary environment.

Modern single-node compute engines

The basic principle of all the following integrations is that we can access our tabular Foundry datasets in multiple formats, such as a Pandas DataFrame, Arrow Table, Polars DataFrame, and even as raw Parquet or CSV files. This is also shown in the Lightweight API. When trying to save tables from memory to Foundry, we can pass them in any of the formats in which we have read them.

Using Ibis

Most modern compute engines embrace the notion of disaggregated data systems and thus operate on industry-standard open-source software. The de-facto standard for storing tables in memory is Arrow ↗. To start, we will use Ibis ↗ (with the DuckDB backend) which uses the Arrow format internally. In this case, we can read the Foundry dataset as a pyarrow.Table ↗ object through a my_input.arrow() call, transform it, and then write back the transformed object into Foundry with my_output.write_table(...). Consider the following example snippet:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import ibis from transforms.api import lightweight, transform, Output, Input @lightweight @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_ibis_transform(my_input, my_output): ibis_client = ibis.duckdb.connect(':memory:') # Get the Foundry dataset as a pyarrow.Table object table = ibis_client.read_in_memory(my_input.arrow()) # Execute the data transformation results = ( table .filter(table['name'].like('John%')) .execute() ) # Save the pyarrow.Table object to a Foundry dataset my_output.write_table(results)

Working with Apache DataFusion and DuckDB

Sometimes, it is easier to cut-out the data deserialization step and directly pass the raw underlying files of the datasets to our compute engine. We can get the path to the files on disk (which get downloaded on-demand) by calling my_input.path(). When it comes to writing raw files back to Foundry, we have two limitations to keep in mind:

  • Only Parquet files can be stored in Foundry datasets through this API.
  • Files must be placed in the folder located at the value of my_output.path_for_write_table.

When both criteria are met, we can call write_table with the path to this folder, like so: my_output.write_table(my_output.path_for_write_table). To see this in action, consider the following snippet demonstrating how to use DataFusion ↗ in-platform.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import datafusion from datafusion import lit from datafusion.functions import col, starts_with from transforms.api import lightweight, transform, Output, Input @lightweight @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_datafusion_transform(my_input, my_output): ctx = datafusion.SessionContext() table = ctx.read_parquet(my_input.path()) my_output.write_table( table .filter(starts_with(col("name"), lit("John"))) .to_arrow_table() )

We can use the same approach for DuckDB ↗ as well, as shown by the following snippet. Note that we have to take care of reading all Parquet files from the folder.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import duckdb from transforms.api import lightweight, transform, Output, Input @lightweight @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_duckdb_transform(my_input, my_output): duckdb.connect(database=':memory:').execute(f""" COPY ( SELECT * FROM parquet_scan('{my_input.path()}/**/*.parquet') WHERE Name LIKE 'John%' ) TO '{my_output.path_for_write_table}' (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE) """) # Optimize performance by writing a separate Parquet file per thread in parallel my_output.write_table(my_output.path_for_write_table)

Utilizing cuDF

You can also achieve integration through the use of pandas.DataFrames. The following snippet shows an instance of using cuDF ↗ in a lightweight transform. This will essentially run your Pandas code in a highly parallelized manner on the GPU where possible.

Copied!
1 2 3 4 5 6 7 8 @lightweight(gpu_type='NVIDIA_T4', cpu_cores=4, memory_gb=32) @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_cudf_transform(my_input, my_output): import cudf # Only import CUDF at runtime, not during CI df = cudf.from_pandas(my_input.pandas()[['name']]) filtered_df = df[df['name'].str.startswith('John')] sorted_df = filtered_df.sort_values(by='name') my_output.write_table(sorted_df)

The above snippet assumes that your Foundry enrollment is equipped with an NVIDIA T4 GPU and it is available to your Project through a resource queue.

Bring your own container workflows

Most computations can be easily expressed with just Python. However, there are use cases where one might want to rely on a non-Python execution engine or script. For instance, it could be an F# application, an ancient VBA script, or even just an end-of-life (EOL) version of Python. The possibilities are limitless when using bring-your-own-container (BYOC) workflows. In short, BYOC enables the creation of a Docker image locally with all the specific dependencies and/or binaries an application requires, and then running a lightweight transform on top of this image.

The following is an example of how to run a COBOL transform in Foundry. For simplicity, this example compiles the COBOL program inside the transform. Alternatively, you may pre-compile the program and copy the binary executable into the Docker image. First, enable containerized workflows then build the Docker image locally following these image requirements.

Consider the following minimal Dockerfile as an example:

Copied!
1 2 3 4 5 6 FROM ubuntu:latest RUN apt update && apt install -y coreutils curl sed build-essential gnucobol RUN useradd --uid 5001 user USER 5001

Note that the Docker image does not need to have Python installed for lightweight to work.

Then, build the image and upload it to Foundry. To do so, create an Artifacts repository and follow these instructions to tag and push our image. In this example, the image is tagged with the name my-image and version 0.0.1. The final step before authoring the BYOC lightweight transform is to add this Artifacts Repository as a local backing repository to the Code Repository.

This following example considers a COBOL script which generates a CSV file, with its source code located at resources/data_generator.cbl inside my Code Repository.

The final step is to write a lightweight transform that allows a connection of the data processing program to Foundry. The following snippet demonstrates how to access the dataset through the Python API while also including arbitrary executables shipped inside the image in question. To invoke the COBOL executable, use the Python standard library's functions (in this case, os.system(...)).

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from transforms.api import Output, transform, lightweight @lightweight(container_image='my-image', container_tag='0.0.1') @transform(my_output=Output('my-output')) def compile_cobol_data_generator(my_output): """Demonstrate how we can bring dependencies that would be difficult to get through Conda.""" # Compile the Cobol program # (Everything from the src folder is available in $USER_WORKING_DIR/user_code) os.system("cobc -x -free -o data_generator $USER_WORKING_DIR/user_code/resources/data_generator.cbl") # Run the program to create and populate data.csv os.system('$USER_WORKING_DIR/data_generator') # Store the results into Foundry my_output.write_table(pd.read_csv('data.csv'))
Preview

Preview is not yet supported for BYOC workflows.

Using the Build button will eventually instantiate a container from our Docker image and invoke the commands specified. Resource allocation, logging, communicating with Foundry, enforcing permissions and auditability are all taken care of automatically.

Incremental workflows

Incremental lightweight transforms are supported in version 0.556.0 of the foundry-transforms-lib-python library.

To write an incremental lightweight transform, use the @lightweight decorator followed by the @incremental decorator. The following code shows an example:

Copied!
1 2 3 4 5 6 7 8 from transforms.api import incremental, Input, lightweight, Output, transform @lightweight() @incremental(require_incremental=True) @transform(my_input=Input("my-input"), my_output=Output('my-output')) def my_incremental_transform(my_input, my_output): my_output.write_pandas(my_input.pandas(mode="added"))

File processing workflows

Lightweight transforms can process files of datasets without schemas. The foundry-transforms-lib-python library is required to do so.

To write a lightweight transform that processes files of a dataset without a schema, use the @lightweight decorator and list the files via my_input.filesystem().ls(). The statement .filesystem().ls() is available for datasets without schemas, but the .path(), .pandas(), .polars(), .arrow(), and .filesystem().files() statements are only available on datasets with schemas.

Code example: Lightweight transform for file processing

The following code shows an example lightweight transform that processes files of a dataset without a schema.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from transforms.api import incremental, Input, lightweight, Output, transform @lightweight() @incremental() @transform(my_input=Input("my-input"), my_output=Output('my-output')) def my_incremental_transform(my_input, my_output): files = [f.path for f in my_input.filesystem().ls()] fs = my_input.filesystem() polars_dataframes = [] for curr_file_as_row in files: # Access the file with fs.open(file_path, "rb") as f: # <do something with the file> # append some data as a dataframe to polars_dataframes # Union all the DFs into one combined_df = union_polars_dataframes(polars_dataframes) out.write_table(combined_df)

The following code shows an example to parse Excel files:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 from transforms.api import transform, Input, Output, lightweight import tempfile import shutil import polars as pl import pandas as pd @lightweight() @transform( my_output=Output("/path/tabular_output_dataset"), my_input=Input("/path/input_dataset_without_schema"), ) def compute(my_input, my_output): # List all files in the input dataset files = [f.path for f in my_input.filesystem().ls()] # Parse each file # Open the Excel file at the provided path, using the provided filesystem def read_excel_to_polars(fs, file_path): with fs.open(file_path, "rb") as f: with tempfile.TemporaryFile() as tmp: # Copy paste the file from the source dataset to the local filesystem shutil.copyfileobj(f, tmp) tmp.flush() # shutil.copyfileobj does not flush # read the excel file (the file is now seekable) pandas_df = pd.read_excel(tmp) # Convert eventual integer columns to string columns pandas_df = pandas_df.astype(str) # Convert the pandas dataframe to a polars dataframe return pl.from_pandas(pandas_df) fs = my_input.filesystem() polars_dataframes = [] for curr_file_as_row in files: # print(curr_file_as_row) polars_dataframes.append(read_excel_to_polars(fs, curr_file_as_row)) def union_polars_dataframes(dfs): return pl.concat(dfs) # Union all the DFs into one combined_df = union_polars_dataframes(polars_dataframes) my_output.write_table(combined_df)

Next steps

To learn more about lightweight transforms, feel free to install the Lightweight examples Marketplace product from your Foundry deployment's Reference Resources Marketplace store, or navigate to the Transforms API reference.