Transforms Python API

The Transforms Python API provides classes and decorators for constructing a Pipeline. This page contains information on available functions; you can also read more about classes.

FunctionDescription
configure([profile])Decorator to modify the configuration of a transform.
incremental([require_incremental, ...])Decorator to convert inputs and outputs into their transforms.api.incremental counterparts.
lightweight([cpu_cores, memory_mb, memory_gb, gpu_type, container_image, container_tag, container_shell_command])Decorator to make the decorated transform run on container transforms.
transform(ios)Wrap up a compute function as a Transform object.
transform_df(output, inputs)Register the wrapped compute function as a dataframe transform.
transform_pandas(output, inputs)Register the wrapped compute function as a pandas transform.
transform_polars(output, inputs)Register the wrapped compute function as a Polars transform.

configure

transforms.api.configure(profile=None, allowed_run_duration=None, run_as_user=False)

  • Decorator to modify the configuration of a transform.
  • The configure decorator must be used to wrap a Transform:
Copied!
1 2 3 4 >>> @configure(profile=['EXECUTOR_MEMORY_MEDIUM']) ... @transform(...) ... def my_compute_function(...): ... pass

Parameters

  • profile (str ↗ or List[str ↗], optional)
    • The name of the transforms profile(s) to use.
  • allowed_run_duration (timedelta ↗, optional)
    • An upper limit on how long this job can take before it fails. Use carefully. When configuring allowed duration, consider variables such as changes in data scale or shape. Duration is minute precision only. IMPORTANT: Take care when using this for incremental transforms, as duration can change significantly when running snapshot.
  • run_as_user (Boolean, optional)
    • Determines whether a transform runs with user permissions. When enabled, a job can behave differently depending on the permissions of the user running the job.

Raises


incremental

transforms.api.incremental(require_incremental=False, semantic_version=1, snapshot_inputs=None)

  • Decorator to convert inputs and outputs into their transforms.api.incremental counterparts.
  • The incremental decorator must be used to wrap a Transform:
Copied!
1 2 3 4 >>> @incremental() ... @transform(...) ... def my_compute_function(...): ... pass

The decorator reads build history from the output datasets to determine the state of the inputs at the time of the last build. This information is used to convert the TransformInput, TransformOutput, and TransformContext objects into their incremental counterparts: IncrementalTransformInput, IncrementalTransformOutput, and IncrementalTransformContext.

This decorator can also be used to wrap the transform_df() and transform_pandas() decorators. These decorators call dataframe() and pandas() on the inputs, without any arguments, to extract the PySpark and pandas DataFrame objects. This means the read mode used will always be added and the write mode will be determined by the incremental decorator. For reading or writing any of the non-default modes, you must use the transform() decorator.

If the added output rows of your PySpark or pandas transform are a function only of the added input rows (per the APPEND example), the default modes will produce a correct incremental transform.

If your transform takes a dataset as input that has SNAPSHOT transactions but that does not alter the ability to run the transform incrementally (such as with reference tables), review the snapshot_inputs argument to avoid running a transform as a full SNAPSHOT.

If your transform performs complex logic (involving joins, aggregations, distinct, and so on), we recommend that you read the incremental documentation before using this decorator.

Parameters

  • require_incremental (Boolean ↗, optional)
    • If True, the transform will refuse to run non-incrementally unless the transform has never been run before. This is determined based on all output datasets having no committed transactions.
  • semantic_version (int ↗, optional)
    • Defaults to 1. This number represents the semantic nature of a transform. It should be changed whenever the logic of a transform changes in a way that would invalidate the existing output. Changing this number causes a subsequent run of the transform to be run non-incrementally.
  • snapshot_inputs (list of str, optional)
    • The inputs for which a SNAPSHOT transaction does not invalidate the current output of a transform. For example, an update to a lookup table does not mean previously computed outputs are incorrect. A transform is run incrementally when all inputs except for these have only added or no new data. When reading snapshot_inputs, the IncrementalTransformInput will only expose the current view of the input dataset.
  • allow_retention (Boolean, optional)
    • If True, deletes made by foundry-retention will not break incrementality.
  • strict_append (Boolean, optional)
    • If the strict_append parameter is set to True and the input datasets are incremental, then the underlying Foundry transaction type is set to be an APPEND, and an APPEND transaction will be used for incremental writes. Trying to overwrite an existing file will lead to an exception. If the input datasets are not incremental, strict_append will run as SNAPSHOT. You should use require_incremental=True to ensure the code runs incrementally as APPEND. Trying to overwrite an existing file will succeed. Note that the write operation may not overwrite any files, even auxiliary ones such as Parquet summary metadata or Hadoop SUCCESS files. Incremental writes for all Foundry formats should support strict_append mode.
  • v2_semantics (Boolean, optional)
    • Defaults to False. If True, will use v2 incremental semantics. There should be no difference in behavior between v2 and v1 incremental semantics, and we recommend all users set this to True. Non-Catalog incremental inputs and outputs are only supported if using v2 semantics.

Raises


lightweight

transforms.api.lightweight(cpu_cores=2, memory_mb=None, memory_gb=16, gpu_type=None, container_image=None, container_tag=None, container_shell_command=None)

  • Decorator to make the decorated transform run on top of the container transforms infrastructure.
  • The lightweight decorator must be used to wrap a Transform:
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 >>> @lightweight ... @transform( ... my_input=Input('/input'), ... my_output=Output('/output') ... ) ... def compute_func(my_input, my_output): ... my_output.write_pandas(my_input.pandas()) >>> @lightweight() ... @transform( ... my_input=Input('/input'), ... my_output=Output('/output') ... ) ... def compute_func(my_input, my_output): ... for file in my_input.filesystem().ls(): ... with my_input.filesystem().open(file.path) as f1: ... with my_output.filesystem().open(file.path, "w") as f2: ... f2.write(f1.read()) >>> @lightweight(memory_gb=3.5) ... @transform_pandas( ... Output('/output'), ... my_input=Input('/input') ... ) ... def compute_func(my_input): ... return my_input >>> @lightweight(container_image='my-image', container_tag='0.0.1') ... @transform(my_output=Output('ri...my_output')) ... def run_data_generator_executable(my_output): ... os.system('$USER_WORKING_DIR/data_generator') ... my_output.write_table(pd.read_csv('data.csv'))

A lightweight transform is a transform that runs without Spark, on a single node. Lightweight transforms are faster and more cost-effective for small to medium-sized datasets. However, lightweight transforms only support a subset of the API of a regular transform, including pandas and the filesystem API, while at the same time provide more methods for accessing datasets. For more information on lightweight transforms, review our documentation.

To use this decorator, foundry-transforms-lib-python must be added as a dependency.

In case any of container_image, container_tag or container_shell_command is set, both container_image and container_tag must be set. In case container_shell_command is not set, a default entrypoint will be used which will bootstrap a Python environment and execute the user code specified in the transform.

Specifying the container_* arguments is referred to as a bring-your-own-container (BYOC) workflow. This guarantees that all files from the user's code repository will be available inside $USER_WORKING_DIR/user_code at runtime and that a Python environment will be available as well.

The image specified by container_image and container_tag must be available from an Artifacts backing repository of the code repository. For more details, review the BYOC documentation.

Parameters

  • cpu_cores (float, optional)
    • The number of CPU cores to allocate to the transform; can be a fraction. The default is 2.
  • memory_mb (float, optional)
    • The amount of memory to allocate to the container, in MB. Either memory_gb or memory_mb may be specified, but not both.
  • memory_gb (float, optional)
    • The amount of memory to allocate to the container, in GB. The default is 16GB.
  • gpu_type (str, optional_)
    • The type of GPU to allocate to the transform.
  • container_image (str, optional_)
    • The image to use for the transform's container.
  • container_tag (str, optional_)
    • The image tag to use for the transform's container.
  • container_shell_command (str, optional_)
    • The shell command to execute in the container. When not specified, a default value is generated resulting in executing the user's transform once the container starts.

transform

transforms.api.transform(ios)

  • Wrap up a compute function as a Transform object.

  • The transform decorator is used to construct a Transform object from a compute function. The names used for inputs and outputs should be the parameter names of the wrapped compute function. At compute-time, the function is passed its inputs and outputs as TransformInput and TransformOutput objects.

Copied!
1 2 3 4 5 6 7 8 9 10 >>> @transform( ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... first_output=Output('/path/to/first/output/dataset'), ... second_output=Output('/path/to/second/output/dataset'), ... ) ... def my_compute_function(first_input, second_input, first_output, second_output): ... # type: (TransformInput, TransformInput, TransformOutput, TransformOutput) -> None ... first_output.write_dataframe(first_input.dataframe()) ... second_output.write_dataframe(second_input.dataframe())

Parameters

The compute function is responsible for writing data to its outputs.

Optionally, the TransformContext and corresponding SparkSession can also be accessed within the compute function. You can use this to create empty data frames, apply Spark configurations, and so on. Where possible, we recommend using existing default Spark profiles over setting the Spark config values through the SparkSession object.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> @transform( ... output=Output('/path/to/first/output/dataset'), ... ) ... def my_compute_function(ctx, output): ... # type: (TransformContext, TransformOutput) -> None ... ... # In this example, the Spark session is used to create an empty data frame. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... empty_df = ctx.spark_session.createDataFrame([], schema=StructType(columns)) ... ... output.write_dataframe(empty_df)

transform_df

transforms.api.transform_df(output, inputs)

  • Register the wrapped compute function as a dataframe transform.
  • The transform_df decorator is used to construct a Transform object from a compute function that accepts and returns pyspark.sql.DataFrame objects. Similar to the transform() decorator, the input names become the compute function’s parameter names. However, a transform_df accepts only a single Output spec as a positional argument. The return value of the compute function is also a DataFramethat is automatically written out to the single output dataset.
Copied!
1 2 3 4 5 6 7 8 >>> @transform_df( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... return first_input.union(second_input)

Parameters

  • output (Output)
    • The single Output spec for the transform.
  • inputs (Input)
    • kwargs (keyword arguments) comprised of named Input specs.

Optionally, the TransformContext and corresponding SparkSession can also be accessed within the compute function. You can use this to create empty data frames, apply Spark configurations, and so on. Where possible, we recommend using existing default Spark profiles over setting the Spark config values via the SparkSession object.

Copied!
1 2 3 4 5 6 7 8 9 10 11 >>> @transform_df( ... Output('/path/to/output/dataset') ... ) ... def my_compute_function(ctx): ... # type: (TransformContext) -> pyspark.sql.DataFrame ... ... # In this example, the Spark session is used to create an empty data frame. ... columns = [ ... StructField("col_a", StringType(), True) ... ] ... return ctx.spark_session.createDataFrame([], schema=StructType(columns))

transform_pandas

transforms.api.transform_pandas(output, inputs)

  • Register the wrapped compute function as a pandas transform.

To use the pandas library, you must add pandas as a run dependency in your meta.yml file.

The transform_pandas decorator is used to construct a Transform object from a compute function that accepts and returns pandas.DataFrame objects. This decorator is similar to the transform_df() decorator, however the pyspark.sql.DataFrame objects are converted to pandas.DataFrame objects before the computation, and converted back afterwards.

Copied!
1 2 3 4 5 6 7 8 >>> @transform_pandas( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... return first_input.concat(second_input)

Note that transform_pandas should only be used on datasets that can fit into memory. If you have larger datasets that you wish to filter down first before converting to pandas, you should write your transformation using the transform_df() decorator and the pyspark.sql.SparkSession.createDataFrame() method.

Copied!
1 2 3 4 5 6 7 8 9 10 >>> @transform_df( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() ... # Perform pandas operations on a subset of the data before converting back to a PySpark DataFrame ... return ctx.spark_session.createDataFrame(pd)

Parameters

  • output (Output)
    • The single Output spec for the transform.
  • inputs (Input)
    • kwargs (keyword arguments) comprised of named Input specs.

transform_polars

transforms.api.transform_polars(output, inputs)

  • Register the wrapped compute function as a Polars transform.

To use this decorator, you must add both foundry-transforms-lib-python and polars as run dependencies in your meta.yml file.

The transform_polars decorator is used to construct a Transform object from a compute function that accepts and returns polars.DataFrame objects. This decorator is similar to the transform_df() decorator, however the user code gets passed polars.DataFrame objects.

The transform_polars decorator is just a thin wrapper over the @lightweight decorator. Using it results in the creation of a lightweight transform which lacks some features of a regular transform. For more information on lightweight transforms, review our documentation.

Spark profiles and some other transforms features cannot be used with @lightweight transforms and thus cannot be used with @transforms_polars either.

Copied!
1 2 3 4 5 6 7 8 >>> @transform_polars( ... Output('ri.main.foundry.dataset.out'), # An unnamed Output spec ... first_input=Input('ri.main.foundry.dataset.in1'), ... second_input=Input('ri.main.foundry.dataset.in2'), ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (polars.DataFrame, polars.DataFrame) -> polars.DataFrame ... return first_input.join(second_input, on='id', how="inner")

Parameters

  • output (Output)
    • The single Output spec for the transform.
  • inputs (Input)
    • kwargs (keyword arguments) comprised of named Input specs.