Lightweight transforms do not support executing PySpark queries. Instead, queries must be written using alternative APIs.
Spark transforms provide substantial platform capabilities, particularly with scaling computation across many nodes when the data is already available locally. However, many data transformations could potentially be managed by a single machine. In circumstances when a single machine is sufficient for data processing, you can choose to opt out of using Spark in order to reduce infrastructure overhead by using compute engines better optimized for single-node use cases.
This documentation describes the @lightweight
decorator for the transforms API, which can be placed atop @transform
, @transform_pandas
and @external_systems
in order to opt-out from using Spark and request an infrastructure more suitable for processing datasets with at most around 10 million rows. @lightweight
also provides first-class integration with Polars ↗ which is a modern compute engine optimized for single-node transformations.
The actual performance depends on the complexity of both the pipeline and the data. Therefore, you are encouraged to compare your transform's running time with and without using the @lightweight
backend.
Spark DataFrames and the Spark context are not available when running with the Lightweight backend.
Many of the existing transforms features are available when using @lightweight
. You can mix regular and Lightweight transforms in the same repository, Preview them, and package and install Lightweight transforms through Marketplace. However, there are also some unsupported features which you can read more about below.
The following sections showcase the Lightweight transform API. To see concrete examples of the API being used for interacting with different data processing engines, review Lightweight examples.
Before using @lightweight
, ensure you have performed the following pre-requisite steps:
foundry-transforms-lib-python
from the Libraries tab.Instead of relying on Spark profiles to request resources, resources can be more granularly requested when invoking the decorator, through the cpu_cores
and memory_gb
or memory_mb
keyword arguments. By default, the maximum allowed values are 8 cores and 32 GBs of memory. To increase these limits, contact Palantir Support.
Copied!1 2 3 4 5 6 7
import polars as pl from transforms.api import transform, Input, Output, lightweight @lightweight(cpu_cores=3.4, memory_gb=16) @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): out.write_table(dataset.polars(lazy=True).filter(pl.col('Name').str.starts_with('A')))
You can fine-tune the resources required by your transformation by passing the resource request as arguments to @lightweight()
. The values in the snippet reflect the usual defaults for Spark transforms.
An added benefit of the resource provisioning API is that you are now able to request GPUs in a streamlined way:
Copied!1 2 3 4 5 6 7 8
import torch # don't forget to add pytorch and pytorch-gpu to your meta.yaml import logging from transforms.api import transform, Output, lightweight @lightweight(gpu_type='NVIDIA_T4') @transform(out=Output('/Project/folder/output')) def compute(out): logging.info(torch.cuda.get_device_name(0))
The above snippet assumes that your Foundry enrollment is equipped with an NVIDIA T4 GPU and it is available to the Project.
When using @lightweight
on top of @transform_pandas
, you can use the same API as you would without @lightweight
. Using @lightweight
on top of @transform
provides you with additional methods on your user function's inputs and outputs.
Copied!1 2 3 4 5 6 7 8
@lightweight @transform(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): polars_df = dataset.polars() # polars_df is a polars.DataFrame object lazy_df = dataset.polars(lazy=True) # activate streaming mode, lazy_df is a polars.LazyFrame pandas_df = dataset.pandas() # pandas_df is a pandas.DataFrame arrow_table = dataset.arrow() # arrow_table is a pyarrow.Table out.write_table(lazy_df) # any of the above formats can be passed to write_table
Refer to the above snippet to see the available dataset formats. Note that calling dataset.pandas()
expects Pandas to be installed in your environment. Likewise, dataset.polars(...)
requires Polars to have been made available.
The Lightweight inputs and outputs also expose the well-known methods such as .filesystem()
. The snippet below shows that unstructured files can be handled in the same way as without @lightweight
.
Copied!1 2 3 4 5 6 7
@lightweight @transform(my_output=Output('/Project/folder/output'), my_input=Input('/Project/folder/input')) def files(my_input, my_output): for file in my_input.filesystem().ls(): with my_input.filesystem().open(file.path, "rb") as f1: with my_output.filesystem().open(file.path, "wb") as f2: f2.write(f1.read())
Using transform generators with Lightweight transforms is also supported.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
def create_transforms(): results = [] for size in [10, 20]: @lightweight @transform( output=Output(f"{root_folder}/demo-outputs/lightweight-polars-{size}"), df=Input(f"{root_folder}/demo-inputs/people-{size}") ) def lightweight_polars(output, df): output.write_table(polars_implementation(df.polars(lazy=True))) results.append(lightweight_polars) return results TRANSFORMS = create_transforms()
The following features are not yet supported by Lightweight transforms:
Polars ↗ is a modern compute engine optimized for single-node transformations. Polars is written in Rust and provides a thin Python wrapper to allow you to author your code in Python while still exploiting native acceleration during pipeline execution.
For example, this is the Polars pipeline used by Palantir to benchmark Lightweight transforms:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
def polars_implementation(polars_df): polars_df = polars_df.with_columns( pl.col("id").cast(pl.Int64).alias("id") ) reciprocated_follows = ( polars_df .explode("follows") .select([ pl.col("id").alias("id1"), pl.col("follows").cast(pl.Int64).alias("id2"), ]) ) return ( polars_df .join( reciprocated_follows .join( reciprocated_follows, left_on=["id1", "id2"], right_on=["id2", "id1"], how="inner" ) .group_by("id1") .agg(pl.count("id2").alias("reciprocated_follows_count")), left_on="id", right_on="id1", how="left", ) .drop(["email", "dob", "id1", "follows"]) )
Polars is recommended for use as a data processing library for authoring Lightweight transforms. When possible, we suggest using Polars in streaming mode, which loads the data chunk by chunk and enables processing of datasets larger in size than available memory. You can access streaming mode by calling the .polars(lazy=True)
method. Currently, UDFs, aggregations such as GROUP BY
operations, and sorting are not fully supported in streaming mode.
Given that your transformation runs in a container managed by an orchestration layer, the container may be terminated if the container's memory limits have been overrun. Polars is not aware of this limit when in streaming mode and can potentially overrun the limit. If you encounter an out-of-memory (OOM) error, increase the container's memory limits by setting @lightweight(memory_gb=32)
or to another appropriate value.
Learn more about the Lightweight API in the Transforms API reference or consider some real-world examples in Lightweight examples.