The Transforms Python API provides classes and decorators for constructing a Pipeline
. This page contains information on available functions; you can also read more about classes.
Function | Description |
---|---|
configure ([profile]) | Decorator to modify the configuration of a transform. |
incremental ([require_incremental, ...]) | Decorator to convert inputs and outputs into their transforms.api.incremental counterparts. |
lightweight ([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command]) | Decorator to make the decorated transform run on container transforms. |
transform (ios) | Wrap up a compute function as a Transform object. |
transform_df (output, inputs) | Register the wrapped compute function as a dataframe transform. |
transform_pandas (output, inputs) | Register the wrapped compute function as a pandas transform. |
transform_polars (output, inputs) | Register the wrapped compute function as a Polars transform. |
configure
transforms.api.configure
(profile=None, allowed_run_duration=None, run_as_user=False)configure
decorator must be used to wrap a Transform
:Copied!1 2 3 4
>>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) ... @transform(...) ... def my_compute_function(...): ... pass
TypeError
↗
Transform
object.incremental
transforms.api.incremental
(require_incremental=False, semantic_version=1, snapshot_inputs=None)transforms.api.incremental
counterparts.incremental
decorator must be used to wrap a Transform
:Copied!1 2 3 4
>>> @incremental() ... @transform(...) ... def my_compute_function(...): ... pass
The decorator reads build history from the output datasets to determine the state of the inputs at the time of the last build. This information is used to convert the TransformInput
, TransformOutput
, and TransformContext
objects into their incremental counterparts: IncrementalTransformInput
, IncrementalTransformOutput
, and IncrementalTransformContext
.
This decorator can also be used to wrap the transform_df()
and transform_pandas()
decorators. These decorators call dataframe()
and pandas()
on the inputs, without any arguments, to extract the PySpark and pandas DataFrame objects. This means the read mode used will always be added
and the write mode will be determined by the incremental
decorator. For reading or writing any of the non-default modes, you must use the transform()
decorator.
If the added output rows of your PySpark or pandas transform are a function only of the added input rows (per the APPEND
example), the default modes will produce a correct incremental transform.
If your transform takes a dataset as input that has SNAPSHOT
transactions but that does not alter the ability to run the transform incrementally (such as with reference tables), review the snapshot_inputs
argument to avoid running a transform as a full SNAPSHOT
.
If your transform performs complex logic (involving joins, aggregations, distinct, and so on), we recommend that you read the incremental documentation before using this decorator.
True
, the transform will refuse to run non-incrementally unless the transform has never been run before. This is determined based on all output datasets having no committed transactions.1
. This number represents the semantic nature of a transform. It should be changed whenever the logic of a transform changes in a way that would invalidate the existing output. Changing this number causes a subsequent run of the transform to be run non-incrementally.SNAPSHOT
transaction does not invalidate the current output of a transform. For example, an update to a lookup table does not mean previously computed outputs are incorrect. A transform is run incrementally when all inputs except for these have only added or no new data. When reading snapshot_inputs, the IncrementalTransformInput
will only expose the current view of the input dataset.True
, deletes made by foundry-retention
will not break incrementality.strict_append
parameter is set to True
and the input datasets are incremental, then the underlying Foundry transaction type is set to be an APPEND
, and an APPEND
transaction will be used for incremental writes. Trying to overwrite an existing file will lead to an exception.
If the input datasets are not incremental, strict_append
will run as SNAPSHOT
. You should use require_incremental=True
to ensure the code runs incrementally as APPEND
. Trying to overwrite an existing file will succeed.
Note that the write operation may not overwrite any files, even auxiliary ones such as Parquet summary metadata or Hadoop SUCCESS
files. Incremental writes for all Foundry formats should support strict_append
mode.False
. If True
, will use v2 incremental semantics. There should be no difference in behavior between v2 and v1 incremental semantics, and we recommend all users set this to True
. Non-Catalog incremental inputs and outputs are only supported if using v2 semantics.TypeError
↗
Transform object
.KeyError
↗
Transform object
.lightweight
transforms.api.lightweight
(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)lightweight
decorator must be used to wrap a Transform
:Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
>>> @lightweight ... @transform( ... my_input=Input('/input'), ... my_output=Output('/output') ... ) ... def compute_func(my_input, my_output): ... my_output.write_pandas(my_input.pandas()) >>> @lightweight() ... @transform( ... my_input=Input('/input'), ... my_output=Output('/output') ... ) ... def compute_func(my_input, my_output): ... for file in my_input.filesystem().ls(): ... with my_input.filesystem().open(file.path) as f1: ... with my_output.filesystem().open(file.path, "w") as f2: ... f2.write(f1.read()) >>> @lightweight(memory_gb=3.5) ... @transform_pandas( ... Output('/output'), ... my_input=Input('/input') ... ) ... def compute_func(my_input): ... return my_input >>> @lightweight(container_image='my-image', container_tag='0.0.1') ... @transform(my_output=Output('ri...my_output')) ... def run_data_generator_executable(my_output): ... os.system('$USER_WORKING_DIR/data_generator') ... my_output.write_table(pd.read_csv('data.csv'))
A lightweight transform is a transform that runs without Spark, on a single node. Lightweight transforms are faster and more cost-effective for small to medium-sized datasets. However, lightweight transforms only support a subset of the API of a regular transform, including pandas and the filesystem API, while at the same time provide more methods for accessing datasets. For more information on lightweight transforms, review our documentation.
To use this decorator, foundry-transforms-lib-python
must be added as a dependency.
In case any of container_image
, container_tag
or container_shell_command
is set, both container_image
and container_tag
must be set. In case container_shell_command
is not set, a default entrypoint will be used which will bootstrap a Python environment and execute the user code specified in the transform.
Specifying the container_*
arguments is referred to as a bring-your-own-container (BYOC) workflow. This guarantees that all files from the user's code repository will be available inside $USER_WORKING_DIR/user_code
at runtime and that a Python environment will be available as well.
The image specified by container_image
and container_tag
must be available from an Artifacts backing repository of the code repository. For more details, review the BYOC documentation.
2
.memory_gb
or memory_mb
may be specified, but not both.16GB
.transform
transforms.api.transform
(ios)Wrap up a compute function as a Transform
object.
The transform
decorator is used to construct a Transform
object from a compute function. The names used for inputs and outputs should be the parameter names of the wrapped compute function. At compute-time, the function is passed its inputs and outputs as TransformInput
and TransformOutput
objects.
Copied!1 2 3 4 5 6 7 8 9 10
>>> @transform( ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... first_output=Output('/path/to/first/output/dataset'), ... second_output=Output('/path/to/second/output/dataset'), ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... first_output.write_dataframe(first_input.dataframe()) ... second_output.write_dataframe(second_input.dataframe())
The compute function is responsible for writing data to its outputs.
Optionally, the TransformContext and corresponding SparkSession can also be accessed within the compute function. You can use this to create empty data frames, apply Spark configurations, and so on. Where possible, we recommend using existing default Spark profiles over setting the Spark config values through the SparkSession
object.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
>>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # In this example, the Spark session is used to create an empty data frame. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... output.write_dataframe(empty_df)
transform_df
transforms.api.transform_df
(output, inputs)transform_df
decorator is used to construct a Transform
object from a compute function that accepts and returns pyspark.sql.DataFrame
↗ objects. Similar to the transform()
decorator, the input names become the compute function’s parameter names. However, a transform_df
accepts only a single Output
spec as a positional argument. The return value of the compute function is also a DataFrame
↗that is automatically written out to the single output dataset.Copied!1 2 3 4 5 6 7 8
>>> @transform_df( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... return first_input.union(second_input)
Output
spec for the transform.Input
specs.Optionally, the TransformContext and corresponding SparkSession can also be accessed within the compute function. You can use this to create empty data frames, apply Spark configurations, and so on. Where possible, we recommend using existing default Spark profiles over setting the Spark config values via the SparkSession object.
Copied!1 2 3 4 5 6 7 8 9 10 11
>>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # In this example, the Spark session is used to create an empty data frame. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))
transform_pandas
transforms.api.transform_pandas
(output, inputs)To use the pandas library, you must add pandas
as a run dependency in your meta.yml
file.
The transform_pandas
decorator is used to construct a Transform
object from a compute function that accepts and returns pandas.DataFrame
↗ objects. This decorator is similar to the transform_df()
decorator, however the pyspark.sql.DataFrame
↗ objects are converted to pandas.DataFrame
↗ objects before the computation, and converted back afterwards.
Copied!1 2 3 4 5 6 7 8
>>> @transform_pandas( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... return first_input.concat(second_input)
Note that transform_pandas
should only be used on datasets that can fit into memory. If you have larger datasets that you wish to filter down first before converting to pandas, you should write your transformation using the transform_df()
decorator and the pyspark.sql.SparkSession.createDataFrame()
↗ method.
Copied!1 2 3 4 5 6 7 8 9 10
>>> @transform_df( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() ... # Perform pandas operations on a subset of the data before converting back to a PySpark DataFrame ... return ctx.spark_session.createDataFrame(pd)
Output
spec for the transform.Input
specs.transform_polars
transforms.api.transform_polars
(output, inputs)To use this decorator, you must add both foundry-transforms-lib-python
and polars
as run dependencies in your meta.yml
file.
The transform_polars
decorator is used to construct a Transform
object from a compute function that accepts and returns polars.DataFrame
↗ objects. This decorator is similar to the transform_df()
decorator, however the user code gets passed polars.DataFrame
↗ objects.
The transform_polars
decorator is just a thin wrapper over the @lightweight
decorator. Using it results in the creation of a lightweight transform which lacks some features of a regular transform. For more information on lightweight transforms, review our documentation.
Spark profiles and some other transforms features cannot be used with @lightweight
transforms and thus cannot be used with @transforms_polars
either.
Copied!1 2 3 4 5 6 7 8
>>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # An unnamed Output spec ... first_input=Input('ri.main.foundry.dataset.in1'), ... second_input=Input('ri.main.foundry.dataset.in2'), ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... return first_input.join(second_input, on='id', how="inner")