The basic principle of the following integrations is that you can access tabular Foundry datasets in multiple formats, such as a pandas DataFrame, Arrow Table, Polars DataFrame, and even as raw Parquet or CSV files. This is also shown in the transforms overview. When saving tables from memory to Foundry, we can pass them in any of the formats in which they have been read.
Sometimes, it is easier to cut out the data deserialization step and directly pass the raw underlying dataset files to our compute engine. We can get the path to the files on disk, which get downloaded on-demand by calling my_input.path(). When it comes to writing raw files back to Foundry, we have two limitations to keep in mind:
my_output.path_for_write_table.When both criteria are met, we can call write_table with the path to this folder, like so:
Copied!1my_output.write_table(my_output.path_for_write_table)
To see this in action, consider the following example demonstrating how to use DataFusion ↗ in the platform.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15import datafusion from datafusion import lit from datafusion.functions import col, starts_with from transforms.api import transform, Output, Input @transform.using(my_input=Input('my-input'), my_output=Output('my-output')) def my_datafusion_transform(my_input, my_output): ctx = datafusion.SessionContext() table = ctx.read_parquet(my_input.path()) my_output.write_table( table .filter(starts_with(col("name"), lit("John"))) .to_arrow_table() )
You can also achieve integration through the use of pandas.DataFrame. The following snippet shows an instance using cuDF ↗ in a lightweight transform. This will essentially run pandas code in a highly parallelized manner on the GPU where possible.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14@transform.using( my_input=Input('my-input'), my_output=Output('my-output') ).with_resources( cpu_cores=4, memory_gb=32, gpu_type='NVIDIA_T4' ) def my_cudf_transform(my_input, my_output): import cudf # Only import CUDF at runtime, not during CI df = cudf.from_pandas(my_input.pandas()[['name']]) filtered_df = df[df['name'].str.startswith('John')] sorted_df = filtered_df.sort_values(by='name') my_output.write_table(sorted_df)
The above snippet assumes that your Foundry enrollment is equipped with an NVIDIA T4 GPU and it is available to your project through a resource queue.
Your Foundry enrollment must be equipped with a supported NVIDIA GPU, and it must be available to your project through a resource queue to use this feature. See GPU provisioning for the list of available GPU types.
You can accelerate Polars lazy API queries on GPUs using cudf-polars ↗, a GPU-backed execution engine for Polars. This allows you to run existing Polars lazy API queries on GPUs with minimal code changes by passing pl.GPUEngine() to the collect() call. For details on supported operations and performance characteristics, see the Polars GPU support documentation ↗.
GPU-accelerated collection currently only supports collect(), which fully materializes results in memory. Streaming operations such as sink_parquet() are not yet supported. Ensure that your output data fits in available GPU and system memory.
To use GPU acceleration for the Polars lazy API, add the cudf-polars package to the run requirements in your conda_recipe/meta.yaml file. For details on adding packages, refer to the Python libraries documentation.
Copied!1 2 3 4 5requirements: run: - python - transforms {{ PYTHON_TRANSFORMS_VERSION }} - cudf-polars
Next, request a GPU in your transform by chaining .with_resources() onto your decorator:
Copied!1 2 3 4 5 6 7 8 9 10 11from transforms.api import transform, Output, Input @transform.using( output=Output("/Users/jsmith/output"), my_input=Input("/Users/jsmith/input"), ).with_resources( gpu_type="NVIDIA_T4", memory_gb=32, ) def compute(output, my_input): ...
Build your query using the standard Polars lazy API, then pass pl.GPUEngine() to collect() to execute it on the GPU:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29import polars as pl from transforms.api import transform, Output, Input @transform.using( output=Output("/Users/jsmith/output"), my_input=Input("/Users/jsmith/input"), ).with_resources( gpu_type="NVIDIA_T4", memory_gb=32, ) def compute(output, my_input): lf = my_input.polars(lazy=True) result = ( lf.with_columns( (pl.col("a") * pl.col("b")).alias("product"), (pl.col("a").pow(2) + pl.col("b").pow(2)).sqrt().alias("norm"), ) .group_by("category") .agg( pl.col("product").sum().alias("total_product"), pl.col("norm").mean().alias("mean_norm"), pl.len().alias("n"), ) .sort("total_product", descending=True) .collect(engine=pl.GPUEngine()) ) output.write_table(result)
The GPU engine only supports a subset of Polars operations. For the full list, see the Polars GPU support documentation ↗. By default, unsupported queries silently fall back to the CPU engine. To raise an error instead, pass raise_on_fail=True:
Copied!1df.collect(engine=pl.GPUEngine(raise_on_fail=True))