All transformations currently default to transaction type SNAPSHOT
.
A Transform
is a description of how to compute a dataset. It describes the following:
The input and output datasets, as well as the transformation code, are specified in a Transform
object and then registered to a Pipeline
. How you can define a Transform
depends on two factors:
Data transformations can be expressed in terms of DataFrame
objects as well as files. These DataFrame
objects just refer to regular Spark DataFrames. In the Spark Scala/Java API, a DataFrame
is represented by a Dataset
. Thus, as a user, you directly interact with Dataset
objects in your data transformation code.
For more information about working with Spark, you can refer to the Java API for Spark documentation ↗ that’s available online.
For transformations that rely on DataFrame
objects, you can:
Dataset<Row>
, orDataset<Row>
containing your input dataset.For transformations that rely on files, you must define a low-level Transform and then access files within your datasets.
Here is a summary of the key differences between the two types of Transforms:
Description | High-Level Transform | Low-Level Transform |
---|---|---|
Allows for data transformations that depend on DataFrame objects | ✓ * | ✓ |
Allows for data transformations that depend on access to files | ✓ | |
Supports multiple input datasets | ✓ | ✓ |
Supports multiple output datasets | ✓ | |
Compute function must return DataFrame value | ✓ | |
Compute function writes to output, instead of returning a value | ✓ |
* We recommend using high-level Transforms for data transformations that depend on DataFrame
objects.
For both Transform
types, you need to create a class that contains your compute function. Within this class, your compute function must be a public, non-static method that’s annotated with @Compute
. Without this annotation, your data transformation code will not get correctly registered.
Each Transforms Java subproject within a repository exposes a single Pipeline
object. This Pipeline
object is used to:
Transform
object responsible for building a given dataset during a Foundry build.The runtime responsible for executing a Java transformation needs to be able to find the project’s Pipeline
. Note that Transforms Java uses the standard Java facility for service loading ↗.
In order to define a Pipeline
object that is associated with your project, you must implement a PipelineDefiner
object. In this PipelineDefiner
object, you can add Transforms to your project’s Pipeline. Specifically, it’s required that each Java subproject implements a single PipelineDefiner
object:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // Code here to add Transforms to your project's Pipeline using // automatic or manual registration. } }
Once you create Java package and implement a PipelineDefiner
object, you must update resources/META-INF/services/com.palantir.transforms.lang.java.api.PipelineDefiner
to point to your PipelineDefiner
implementation:
Copied!1 2 3 4
// Replace this with the class name for your "PipelineDefiner" implementation. // Since each Java subproject implements a single "PipelineDefiner", this file // can only contain a single entry. myproject.MyPipelineDefiner
MyPipelineDefiner
refers to the class name for your PipelineDefiner
implementation.
Once a Transform
associated with your project’s Pipeline declares a dataset as an output, you can build this dataset in Foundry. The two recommended ways to add Transform
objects to a Pipeline
are manual registration and automatic registration.
If you have a more advanced workflow and/or want to explicitly add each Transform
object to your project’s Pipeline, you can use manual registration. For instance, it’s useful to use manual registration if you want to meta-programmatically apply the same data transformation logic to multiple input and output dataset combinations.
Otherwise, it’s highly recommended to use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the Pipeline.autoBindFromPackage()
discovers any Transform
definitions in a package (provided that these objects have the required @Input
and @Output
annotations).
As the complexity of a project grows, manually adding Transform
objects to a Pipeline
can become unwieldy. Thus, the Pipeline
object provides the autoBindFromPackage()
method to discover all Transform
objects within a Java package. To use automatic registration, you must do the following:
Transform
. With automatic registration, you define a class that contains information about your input and output datasets as well as your compute function.@Input
and @Output
annotations.Pipeline.autoBindFromPackage()
method to register any Transform
definitions in your provided Java package. The autoBindFromPackage() method will only register Transform definitions in that have the required annotations. Any Transforms that do not have the required annotations will not be added to your project’s Pipeline
, even if these Transforms are in the Java package you provide to the autoBindFromPackage()
method.Transform
objects can manually be added to a Pipeline
using the Pipeline.register()
method. Each call to this method can register one Transform
. In order to use manual registration with Transforms, you must do the following:
Transform
object. Unlike automatic registration, with manual registration, you provide information about your input and output datasets within your PipelineDefiner implementationHighLevelTransform.builder()
or the LowLevelTransform.builder()
to specify which compute function to use as well as provide your input and output datasets.Pipeline.register()
method to explicitly add your Transform
definitions to your project’s Pipeline.Note that use of annotations such as @StopProgagating
and @StopRequiring
are only supported for automatically registered Java transforms.
There may be cases when a data transformation depends on things other
than its input datasets. For instance, a transformation may be required
to access the current Spark session or access transforms parameters in
the jobSpec. In such cases, you can inject a TransformContext
object
into the transformation. To do this, your compute function must accept a
parameter of type TransformContext
. TransformContext
contains the
Transforms authHeader, Spark session, transform parameters and a
ServiceDiscovery
object. ServiceDiscovery
class exposes service URIs
of discovered Foundry services.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * This is an example high-level Transform that accesses the TransformContext */ @Compute public Dataset<Row> myComputeFunction(Dataset<Row> myInput, TransformContext context) { int limit = (int) context.parameters().get("limit"); return myInput.limit(limit); }
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * This is an example low-level Transform that accesses the TransformContext */ @Compute public void compute(FoundryInput input, FoundryOutput output, TransformContext context) { int limit = (int) context.parameters().get("limit"); output.getDataFrameWriter(input.asDataFrame().read().limit(limit)).write(); }