Transforms and pipelines

In Python, transforms.api.Transform is a description of how to compute a dataset. It describes the following:

  • The input and output datasets
  • The code used to transform the input datasets into the output dataset (we’ll refer to this as the compute function), and
  • Any additional configuration defined by the configure() decorator (this includes custom Transforms profiles to use at runtime)

The input and output datasets, as well as the transformation code, are wrapped up in a Transform object and then registered to a Pipeline. You should not construct a Transform object by hand. Instead, you should use one of the decorators described below.

It’s important to keep in mind that data transformations can be expressed in terms of pyspark.sql.DataFrame objects as well as files. For transformations that rely on DataFrame objects, you can either use the transform decorator and explicitly call a method to access a DataFrame containing your input dataset or you can simply use the DataFrame transform decorator). For transformations that rely on files, you can use the transform decorator and then access files within your datasets. If your data transformation is going to be exclusively using the Pandas library, you can use the Pandas transform decorator.

You can define multiple Transform objects in a single Python file. Also, all transformations currently run with transaction type SNAPSHOT.

Transforms

Decorator

The transforms.api.transform() decorator can be used if you’re writing data transformations that depend on DataFrame objects or files. This decorator accepts as keyword arguments a number of transforms.api.Input and transforms.api.Output specifications. During a Foundry build, these specifications are resolved into transforms.api.TransformInput and transforms.api.TransformOutput objects, respectively. These TransformInput and TransformOutput objects provide access to the dataset within the compute function.

The keyword names used for the inputs and outputs must correspond to the parameter names of the wrapped compute function.

Let’s step through a simple example for creating a Transform object using the transform() decorator. We will use a small sample dataset called /examples/students_hair_eye_color. Here is a preview of the dataset:

>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair|  eye| sex|
+---+-----+-----+----+
|  1|Black|Brown|Male|
|  2|Brown|Brown|Male|
|  3|  Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows

Now, we can define a Transform that takes /examples/students_hair_eye_color as its input and creates /examples/hair_eye_color_processed as its output:

Copied!
1 2 3 4 5 6 7 8 9 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None filtered_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') processed.write_dataframe(filtered_df)

Note that the input name of “hair_eye_color” and the output name of “processed” are used as the parameter names in the filter_hair_color compute function. Furthermore, filter_hair_color reads a DataFrame from the TransformInput using the dataframe() method. The DataFrame is then filtered using filter(), which is a regular PySpark function. This filtered DataFrame is then written to the output named “processed” using the write_dataframe() method.

The DataFrame objects returned by a TransformInput are regular PySpark DataFrames. For more information about working with PySpark, you can refer to the Spark Python API documentation ↗ available online.

If your data transformation relies on access to files, rather than DataFrame objects, refer to the section on file access.

Multiple outputs

Transforms with multiple outputs are useful when a single input dataset needs to be broken into several parts. Multiple-output Transforms are only supported with the transforms.api.transform() decorator. Recall the /examples/students_hair_eye_color dataset:

>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair|  eye| sex|
+---+-----+-----+----+
|  1|Black|Brown|Male|
|  2|Brown|Brown|Male|
|  3|  Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows

We can now pass multiple Output specifications to the transform() decorator in order to split the input:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def brown_hair_by_sex(hair_eye_color, males, females): # type: (TransformInput, TransformOutput, TransformOutput) -> None brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male')) females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female'))

Notice that we only had to filter down to brown hair once, after which we could share the filtered dataset to generate multiple output datasets.

DataFrame Transform decorator

It’s common for data transformations in Python to read, process, and write DataFrame objects. If your data transformation depends on DataFrame objects, you can use the transforms.api.transform_df() decorator. This decorator injects DataFrame objects and expects the compute function to return a DataFrame. Alternatively, you can use the more general transform() decorator and explicitly call the dataframe() method to access a DataFrame containing your input dataset. Recall that the transform() decorator injects the more powerful transforms.api.TransformInput and transforms.api.TransformOutput objects, rather than DataFrame objects. The transform_df() decorator accepts as keyword arguments a number of transforms.api.Input specifications, and it accepts as a positional argument a single transforms.api.Output specification. As required by Python, the positional Output argument must appear first, followed by the keyword Input argument. Let’s step through a simple example for creating a Transform object using the transform_df() decorator. We will use the small sample dataset from above called /examples/students_hair_eye_color. Here is a preview of the dataset:

>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair|  eye| sex|
+---+-----+-----+----+
|  1|Black|Brown|Male|
|  2|Brown|Brown|Male|
|  3|  Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows

Now, we will modify the example from the Transform decorator section above to use the transform_df() decorator. We define a Transform that takes /examples/students_hair_eye_color as its input and creates /examples/hair_eye_color_processed as its output:

Copied!
1 2 3 4 5 6 7 8 9 from transforms.api import transform_df, Input, Output @transform_df( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pyspark.sql.DataFrame) -> pyspark.sql.DataFrame return hair_eye_color.filter(hair_eye_color.hair == 'Brown')

Note that the input name of “hair_eye_color” is used as the parameter name in the filter_hair_color compute function. Furthermore, since Python requires positional arguments to come before keyword arguments, the Output argument appears before any Input arguments.

Pandas Transform decorator

Warning

The transform_pandas decorator should only be used on datasets that can fit into memory. Otherwise, you should write your data transformation using the transform_df decorator and filter your input datasets before converting them to Pandas DataFrames using the toPandas method.

We recommend using the toPandas method with PyArrow added as a dependency in your meta.yaml. This enables Pandas DataFrame conversion optimization with Arrow ↗.

If your data transformation depends exclusively on the Pandas library, you can use the transforms.api.transform_pandas() decorator. To use the Pandas library, you must add pandas as a run dependency in your meta.yml file . For more information, refer to the section describing the meta.yml file.

The transform_pandas() decorator is similar to the transform_df() decorator, but transform_pandas() converts the input datasets into pandas.DataFrame objects and accepts a return type of pandas.DataFrame.

The transform_pandas() decorator accepts as keyword arguments a number of transforms.api.Input specifications, and it accepts as a positional argument a single transforms.api.Output specification. As required by Python, the positional Output argument must appear first, followed by the keyword Input argument.

Let’s step through a simple example for creating a Transform object using the transform_pandas()decorator. We will use the same sample dataset from above called /examples/students_hair_eye_color. Here is a preview of the dataset:

>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair|  eye| sex|
+---+-----+-----+----+
|  1|Black|Brown|Male|
|  2|Brown|Brown|Male|
|  3|  Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows

Now, we can define a Transform that takes /examples/students_hair_eye_color as its input and creates /examples/hair_eye_color_processed as its output:

Copied!
1 2 3 4 5 6 7 8 9 from transforms.api import transform_pandas, Input, Output @transform_pandas( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pandas.DataFrame) -> pandas.DataFrame return hair_eye_color[hair_eye_color['hair'] == 'Brown']

Note that the input name of “hair_eye_color” is used as the parameter name in the filter_hair_color compute function. Furthermore, since Python requires positional arguments to come before keyword arguments, the Output argument appears before any Input arguments. This example creates a Transform from a compute function that accepts and returns pandas.DataFrame, rather than pyspark.sql.DataFrame objects like in the example in the DataFrame Transform decorator section above. Note that you can convert Pandas DataFrames to PySpark DataFrames using the createDataFrame() method—call this method on the spark_session attribute of your Transform context.

Transform context

There may be cases when a data transformation depends on things other than its input datasets. For instance, a transformation may be required to access the current Spark session or to contact an external service. In such cases, you can inject a transforms.api.TransformContext object into the transformation.

To inject a TransformContext object, your compute function must accept a parameter called ctx. Note that this also means that no inputs or outputs may be named ctx. For instance, you can use a TransformContext object if you want to create a DataFrame from Python data structures:

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, Output @transform( out=Output('/examples/context') ) def generate_dataframe(ctx, out): # type: (TransformContext) -> pyspark.sql.DataFrame df = ctx.spark_session.createDataFrame([ ['a', 1], ['b', 2], ['c', 3] ], schema=['letter', 'number']) out.write_dataframe(df)

Transform logic level versioning

Warning

For TLLV to function correctly, your code must declare all imports at the module level and should not attempt to patch or otherwise modify objects in another module. If this is not the case in your project, you must disable TLLV. Refer to the code example below for more information. TLLV is enabled by default. To disable TLLV set tllv in transformsPython configuration to false. This configuration is inside the build.gradle file in your Transforms Python subproject.

transformsPython {
    tllv false
}

Transform’s version is a string that is used to compare two versions of a transform when considering logic staleness. Transform’s output is up-to-date if its inputs are unchanged and if the transform’s version is unchanged. If the version changes, the transform’s output will be invalidated and recomputed.

By default, transform’s version includes the following:

  • the module where the transform is defined,
  • all modules which the transform depends on, and
  • any project dependencies

If any of these change, the version string will be changed. If you want to invalidate outputs if change happens in file not covered by listed parts, set tllvFiles in transformsPython configuration. Example usecase is if you’re reading configuration for a file and you want to invalidate outputs when configuration is changed.

transformsPython {
    tllvFiles = [
        'path/to/file/you/want/to/include/relative/to/project/directory'
    ]
}

If you want to avoid invalidating outputs when any project dependency version is changed set tllvIncludeDeps to false.

transformsPython {
    tllvIncludeDeps false
}

Consider the following code example of valid and invalid imports:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # If you're only importing at the module top, you have nothing to worry about. from transforms.api import transform_df, Input, Output from myproject.datasets import utils from myproject.testing import test_mock as tmock import importlib # Using `__import__` of `importlib` is fine as long as it happens in module scope. logging = __import__('logging') my_compute = importlib.import_module('myproject.compute') def helper(x): # This is invalid, you must disable TLLV if you import in functions or class methods. # All imports must be in module scope. import myproject.helpers as myhelp return myhelp.round(x) @transform_df( Output("/path/to/output/dataset"), my_input=Input("/path/to/input/dataset"), ) def my_compute_function(my_input): # This is invalid, you must disable TLLV if you want to use any way of importing in functions! ihelper = __import__('myproject.init_helper') my_functions = importlib.import_module('myproject.functions') return my_input

You must disable TLLV if you’re using extension modules ↗.

Pipelines

Each Transforms Python subproject within a repository exposes a single transforms.api.Pipeline object. This Pipeline object is used to:

  1. Register datasets in Foundry with instructions for how to build them, and
  2. Locate and execute the transforms.api.Transform object responsible for building a given dataset during a Foundry build.

Entry point

The runtime responsible for executing a Python transformation needs to be able to find the project’s Pipeline. To export a Pipeline, you add it to the entry_points argument in the setup.py file in a Transforms Python subproject. For more information about entry points, you can refer to the setuptools library documentation ↗. By default, it’s required that each Python subproject exports a transforms.pipelines entry point named root. Recall that an entry point is defined in the setup.py file located in the root directory of a Python subproject. The entry point references the module name as well as the Pipeline attribute. For instance, say you have a Pipeline called “my_pipeline” defined in myproject/pipeline.py:

Copied!
1 2 3 from transforms.api import Pipeline my_pipeline = Pipeline()

You can register this Pipeline in setup.py by doing the following:

Copied!
1 2 3 4 5 6 7 8 9 10 import os from setuptools import find_packages, setup setup( entry_points={ 'transforms.pipelines': [ 'root = myproject.pipeline:my_pipeline' ] } )

In the code above, root refers to the name of the Pipeline you’re exporting, myproject.pipeline refers to the module containing your Pipeline, and my_pipeline refers to the Pipeline attribute defined in that module.

Adding transforms to a pipeline

Once a Transform object associated with your project’s Pipeline declares a dataset as an Output you can build this dataset in Foundry. The two recommended ways to add Transform objects to a Pipeline are manual registration and automatic registration.

Tip

If you have a more advanced workflow and/or want to explicitly add each Transform object to your project’s Pipeline, you can use manual registration. Otherwise, it’s highly recommended to use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the discover_transforms method recursively discovers any Transform objects defined at the module-level. Refer to the sections below for more information.

Automatic registration

Warning

The discover_transforms method imports every module it finds. As a result, any code within your imported modules will be executed.

As the complexity of a project grows, manually adding Transform objects to a Pipeline can become unwieldy. Thus, the Pipeline object provides the discover_transforms() method to recursively discover all Transform objects within a Python module or package.

Copied!
1 2 3 4 5 from transforms.api import Pipeline import my_module my_pipeline = Pipeline() my_pipeline.discover_transforms(my_module)

Manual registration

Transform objects can be manually added to a Pipeline using the add_transforms() function. This function takes any number of Transform objects and adds them to the Pipeline. It also checks that no two Transform objects declare the same output dataset.

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform_df, Pipeline, Input, Output @transform_df( Output('/path/to/output/dataset'), my_input=Input('/path/to/input/dataset') ) def my_compute_function(my_input): return my_input my_pipeline = Pipeline() my_pipeline.add_transforms(my_compute_function)

Transform generation

Warning

If you want to define a data transformation that creates multiple outputs, you can either use Transform generation or define a multiple-output Transform. With Transform generation, it may be necessary to read and process the same input once for every output. With a multiple-output Transform, it is possible to read and process the input just once.

You may want to re-use the same data transformation logic across multiple Transform objects. For instance, consider the following scenarios:

  • You have an input dataset with information about various states. You have code that filters down the input by state and then calculates various statistics.
  • You have multiple input datasets that may contain null values. You have code that removes any nulls.

In both cases, it would be useful to use the same data transformation code across multiple Transforms. Instead of separately defining a Transform object for each of your outputs, you can generate Transform objects using a for-loop and then register them in bulk to your project’s Pipeline. Here is an example for generating Transforms:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from transforms.api import transform_df, Input, Output def transform_generator(sources): # type: (List[str]) -> List[transforms.api.Transform] transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform_df( Output('/sources/{source}/output'.format(source=source)), my_input=Input('/sources/{source}/input'.format(source=source)) ) def compute_function(my_input, source=source): # To capture the source variable in the function, you pass it as a defaulted keyword argument. return my_input.filter(my_input.source == source) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
Warning

To capture the source variable in the function, you must pass it as with the defaulted keyword argument source in your compute function.

Warning

When using a loop to generate Transforms, the loop for generating your Transform objects must be within a function, since Python for-loops don’t create new scopes. If a function is not used, automatic registration will mistakenly only discover the final Transform object defined in your for-loop. This function should return a list of the generated Transforms objects and the return value should be set equal to a variable. Following these criteria within a module that is set up to be discovered via automatic registration will allow you to use automatic registration with generated Transforms. Alternatively, you can use manual registration.

Warning

If the list of input datasets changes between builds (for example, if the list of input datasets is read from a file that is modified between builds), the build will fail because the new dataset references will not be found in the job specification for the build.

Dynamic input or output naming is not possible in Transforms. When the CI job runs, all the relations between inputs and outputs are determined including the links between unique identifiers and dataset names. Output datasets that do not exist are created, and a JobSpec is added to them.

Whenever a dataset is built, the reference to the repository, source file, and the entry point of the function that creates the dataset is obtained from the JobSpec. Following this, the build process is initiated and your function is called to generate the final result. Therefore, if there are changes in your inputs or outputs and the build process is launched, it will lead to an error because the JobSpecs are no longer valid. This disrupts the connection between the unique identifier and the dataset name.

If using manual registration, you can then add the generated transforms into the pipeline. If you are unfamiliar with the * syntax, refer to this tutorial.

Copied!
1 2 3 4 import my_module my_pipeline = Pipeline() my_pipeline.add_transforms(*my_module.TRANSFORMS)
Warning

Note that the Build button in Code Repositories may not work for manual registration and will present a No transforms discovered in the pipeline from the requested file error. You can still build these datasets with the Data Lineage or the Dataset Preview applications.