Transforms and pipelines

All transformations currently default to transaction type SNAPSHOT.

A Transform is a description of how to compute a dataset. It describes the following:

  • The input and output datasets,
  • The code used to transform the input datasets into the output dataset (we’ll refer to this as the compute function), and
  • Any additional configuration (such as the custom Transforms profiles to use at runtime).

The input and output datasets, as well as the transformation code, are specified in a Transform object and then registered to a Pipeline. How you can define a Transform depends on two factors:

Transform type

Tip

Data transformations can be expressed in terms of DataFrame objects as well as files. These DataFrame objects just refer to regular Spark DataFrames. In the Spark Scala/Java API, a DataFrame is represented by a Dataset. Thus, as a user, you directly interact with Dataset objects in your data transformation code.
For more information about working with Spark, you can refer to the Java API for Spark documentation ↗ that’s available online.

For transformations that rely on DataFrame objects, you can:

  • Define a high-level Transform, which supports input and output of type Dataset<Row>, or
  • Define a low-level Transform and explicitly call a method to access the Dataset<Row> containing your input dataset.

For transformations that rely on files, you must define a low-level Transform and then access files within your datasets.

Here is a summary of the key differences between the two types of Transforms:

DescriptionHigh-Level TransformLow-Level Transform
Allows for data transformations that depend on DataFrame objects✓ *
Allows for data transformations that depend on access to files
Supports multiple input datasets
Supports multiple output datasets
Compute function must return DataFrame value
Compute function writes to output, instead of returning a value

* We recommend using high-level Transforms for data transformations that depend on DataFrame objects.

For both Transform types, you need to create a class that contains your compute function. Within this class, your compute function must be a public, non-static method that’s annotated with @Compute. Without this annotation, your data transformation code will not get correctly registered.

Registration Type

Each Transforms Java subproject within a repository exposes a single Pipeline object. This Pipeline object is used to:

  1. Register datasets in Foundry with instructions for how to build them, and
  2. Locate and execute the Transform object responsible for building a given dataset during a Foundry build.

Entry Point

The runtime responsible for executing a Java transformation needs to be able to find the project’s Pipeline. Note that Transforms Java uses the standard Java facility for service loading ↗.

In order to define a Pipeline object that is associated with your project, you must implement a PipelineDefiner object. In this PipelineDefiner object, you can add Transforms to your project’s Pipeline. Specifically, it’s required that each Java subproject implements a single PipelineDefiner object:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // Code here to add Transforms to your project's Pipeline using // automatic or manual registration. } }

Once you create Java package and implement a PipelineDefiner object, you must update resources/META-INF/services/com.palantir.transforms.lang.java.api.PipelineDefiner to point to your PipelineDefiner implementation:

Copied!
1 2 3 4 // Replace this with the class name for your "PipelineDefiner" implementation. // Since each Java subproject implements a single "PipelineDefiner", this file // can only contain a single entry. myproject.MyPipelineDefiner

MyPipelineDefiner refers to the class name for your PipelineDefiner implementation.

Adding transforms to a pipeline

Once a Transform associated with your project’s Pipeline declares a dataset as an output, you can build this dataset in Foundry. The two recommended ways to add Transform objects to a Pipeline are manual registration and automatic registration.

Tip

If you have a more advanced workflow and/or want to explicitly add each Transform object to your project’s Pipeline, you can use manual registration. For instance, it’s useful to use manual registration if you want to meta-programmatically apply the same data transformation logic to multiple input and output dataset combinations.

Otherwise, it’s highly recommended to use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the Pipeline.autoBindFromPackage() discovers any Transform definitions in a package (provided that these objects have the required @Input and @Output annotations).

Automatic registration

As the complexity of a project grows, manually adding Transform objects to a Pipeline can become unwieldy. Thus, the Pipeline object provides the autoBindFromPackage() method to discover all Transform objects within a Java package. To use automatic registration, you must do the following:

  • Define a class corresponding to your Transform. With automatic registration, you define a class that contains information about your input and output datasets as well as your compute function.
  • Add the sufficient @Input and @Output annotations.
  • Call the Pipeline.autoBindFromPackage() method to register any Transform definitions in your provided Java package. The autoBindFromPackage() method will only register Transform definitions in that have the required annotations. Any Transforms that do not have the required annotations will not be added to your project’s Pipeline, even if these Transforms are in the Java package you provide to the autoBindFromPackage() method.

Manual Registration

Transform objects can manually be added to a Pipeline using the Pipeline.register() method. Each call to this method can register one Transform. In order to use manual registration with Transforms, you must do the following:

  • Define a class containing the compute function for your Transform object. Unlike automatic registration, with manual registration, you provide information about your input and output datasets within your PipelineDefiner implementation
  • Use the HighLevelTransform.builder() or the LowLevelTransform.builder() to specify which compute function to use as well as provide your input and output datasets.
  • Call the Pipeline.register() method to explicitly add your Transform definitions to your project’s Pipeline.
Warning

Note that use of annotations such as @StopProgagating and @StopRequiring are only supported for automatically registered Java transforms.

Transform Context

There may be cases when a data transformation depends on things other than its input datasets. For instance, a transformation may be required to access the current Spark session or access transforms parameters in the jobSpec. In such cases, you can inject a TransformContext object into the transformation. To do this, your compute function must accept a parameter of type TransformContext. TransformContext contains the Transforms authHeader, Spark session, transform parameters and a ServiceDiscovery object. ServiceDiscovery class exposes service URIs of discovered Foundry services.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * This is an example high-level Transform that accesses the TransformContext */ @Compute public Dataset<Row> myComputeFunction(Dataset<Row> myInput, TransformContext context) { int limit = (int) context.parameters().get("limit"); return myInput.limit(limit); }
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * This is an example low-level Transform that accesses the TransformContext */ @Compute public void compute(FoundryInput input, FoundryOutput output, TransformContext context) { int limit = (int) context.parameters().get("limit"); output.getDataFrameWriter(input.asDataFrame().read().limit(limit)).write(); }