Incremental computation is an advanced feature. Ensure that you understand the rest of the user guide before making use of this feature.
The transforms shown so far in the user guide recompute their entire output datasets every time they are run. This can lead to a lot of unnecessary work. Consider the following example:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
package myproject.datasets; import com.palantir.transforms.lang.java.api.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class FilterTransform { @Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().read(); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(); } }
If any new data is added to the /examples/students_hair_eye_color
input dataset, the filter()
is performed over the entire input, rather
than just the new data added to the input. This is wasteful in compute
resource and time.
If a transform can become aware of its build history, it can be smarter about how to compute its output. More specifically, it can use the changes made to the inputs to modify the output dataset. This process of using already materialized data when re-materializing tables is called incremental computation. Without incremental computation, the output dataset is always replaced by the latest output of the transform.
Let's go back to the example transform shown above. The transform
performs a filter()
over the students
dataset to write out students
with brown hair. With incremental computation, if data about two new
students is appended to students
, the transform can use information
about its build history to append only the new brown-haired students to
the output:
RAW DERIVED
+---+-----+-----+ +---+-----+-----+
| id| hair| eye| | id| hair| eye|
+---+-----+-----+ Build 1 +---+-----+-----+
| 17|Black|Brown| ---------> | 18|Brown|Brown|
| 18|Brown|Brown| +---+-----+-----+
| 19| Red|Brown|
+---+-----+-----+ ...
...
+---+-----+-----+ Build 2 +---+-----+-----+
| 20|Brown|Brown| ---------> | 20|Brown|Brown|
| 21|Black|Blue | +---+-----+-----+
+---+-----+-----+
We will guide you step by step on how to write an incremental transform using transforms-java. As opposed to transforms-python, transforms-java doesn't use annotations to automatically verify incrementality and apply transformations in incremental fashion. The process of writing incremental transforms in java is controlled more directly by the user, who can explicitly decide in which case a transformation should act incrementally and when not. By interpreting how the input dataset was modified, the user can decide whether to update the output dataset in incremental fashion or in snapshot-like fashion.
The first step to take involves interpretation of your input. The input dataset could be modified in multiple ways, and we will be able to apply incremental transformations only in some specific circumstances. DataFrameModificationType (or FilesModificationType) expresses the different ways a dataset can be modified. The different modes are:
Based on how the input has changed we can make decisions on what to read from the input dataset and what to write to the output dataset.
Knowing how the input was modified allows us to read it accordingly. If
a transaction only appended data we are sure we can safely act
incrementally and read only what was modified. If, instead, we have a
change to the input dataset including modification of already existing
rows we may want to re read the whole view. Transforms-Java API allows
for different read modes for input dataset thanks to the
readForRange()
method.
ReadRange exposes the possible reading ranges. The different modes are:
By interpreting the input modification type we can then decide how to read our data, as shown in the example below.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
private ReadRange getReadRange(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case APPENDED: LOG.info("Append-only changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case UPDATED: LOG.info("Update-type changes in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; case NEW_VIEW: LOG.info("New view in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } }
We can then modify our compute
method accordingly.
Copied!1 2 3 4 5 6 7
@Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(); }
At this point we are only reading different portions of the input dataset but not acting differently on the output dataset. Running the code in this example up to this point will always result in a snapshot transaction on the output, no matter which portion of the input you are reading. Proceed until the end of the tutorial before applying your incremental transforms in order to understand how to correctly modify the output dataset.
In this step it's on the user to apply whichever transformation of the data is needed. Remember that depending on the input modification the data read will differ. In our case the transformation is a simple filter for brown eyes, that we can isolate as:
Copied!1
inputDf = inputDf.filter("eye = 'Brown'");
Once we have interpreted the modifications in the input dataset, read the desired portion of the input and transformed the data according to our transformation logic, we can write our output accordingly. WriteMode provides the different writing modes. The different modes are:
For example, in our case, we can choose the output type based on the input modification type.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
private WriteMode getWriteMode(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case APPENDED: LOG.info("Append-only changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case UPDATED: LOG.info("Update-type changes in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; case NEW_VIEW: LOG.info("new view in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } }
Do not confuse WriteMode.UPDATE
and
DataFrameModificationType.UPDATED
. The former results in an
incremental modification of output dataset that will result in a
DataFrameModificationType.APPENDED
for downstream datasets. The latter
is a modification of the input dataset that includes both appends and
modification in existing rows.
Finally, the write()
function can be can be modified to include a
write mode:
Copied!1 2 3 4 5 6 7
@Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput)); }
We can build a simple incremental filtering transform by putting the pieces together.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
package myproject.datasets; import com.palantir.transforms.lang.java.api.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class FilterTransform { private static final Logger LOG = LoggerFactory.getLogger(FilterTransform.class); @Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput)); } private ReadRange getReadRange(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case APPENDED: LOG.info("Append-only changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case UPDATED: LOG.info("Update-type changes in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; case NEW_VIEW: LOG.info("New view in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } } private WriteMode getWriteMode(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case APPENDED: LOG.info("Append-only changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case UPDATED: LOG.info("Update-type changes in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; case NEW_VIEW: LOG.info("new view in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } } }
As introduced above, we evaluate the input modification type and read the input accordingly. We then decide whether to incrementally update the output dataset or start a new snapshot transaction.
Let's say you want to mostly run incremental transforms but sometimes need to rerun a snapshot of your dataset.
To avoid manually hardcoding the desired result, you can add a new input that results in the output using the SNAPSHOT
write mode whenever this input is modified. This new input will essentially then act as a snapshot trigger dataset. Note that you will have to adapt the read range of the transform's other inputs based on the modification type of this new snapshot trigger dataset.
It is also possible to externally force a snapshot by creating an empty append transaction without provenance. However, transforms-java does not expose such functionality, and it is therefore out of the scope of this guide.
The advanced features in this section can have serious negative effects if used incorrectly. Do not use these features if you are not absolutely certain of the implications. If run without appropriate care and caution, there is a high risk of unwanted consequences. Contact your Palantir representative if you have any questions.
Advanced features are usually included with annotations on top of your @Compute
function. However, if your transform is manually registered, you will need to add the properties to the Transform Builder instead.
If an incremental build depends on an append-only dataset growing indefinitely and there is insufficient disk space for that growth, it may become necessary to delete parts of the upstream dataset.
However, this may break incrementality as the modification of the original dataset will not result in a APPENDED
modification type.
IncrementalOptions.IGNORE_INCREMENTAL_DELETES
will avoid this and not treat deletions in the upstream dataset as breaking changes.
It is only possible to ignore incremental deletes in low-level transforms.
Copied!1 2 3 4 5 6
@Compute @UseIncrementalOptions(IncrementalOptions.IGNORE_INCREMENTAL_DELETES) public void myComputeFunction( @Input("/Users/admin/students_data") FoundryInput myInput, @Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) { ...
If your transform is manually registered, add the property to the builder as in the following code block.
Copied!1 2 3 4 5 6
LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder() .computeFunctionInstance(new MyLowLevelManualFunction()) .putParameterToInputAlias("myInput", "/path/to/input/dataset") .putParameterToOutputAlias("myOutput", "/path/to/output/dataset") .ignoreIncrementalDeletes(true) .build();
Note that a schema modification in the input dataset may have unexpected consequences when combined with incremental transforms.
Read all of the documentation below and ensure that you understand all potential implications before using this feature.
It is only possible to ignore schema changes in low level transforms.
If the schema of the dataset an incremental build depends on changes, the change will result in a DataFrameModificationType.NEW_VIEW
, possibly breaking incrementality.
However, if the IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE
option is set, a schema change won't result in a new view.
Instead, the schema change in the input dataset will be interpreted as DataFrameModificationType.UNCHANGED
and a SchemaModificationType flag SchemaModificationType.NEW_SCHEMA
will be set, allowing the user to explicitly treat this special case.
Copied!1 2 3 4 5 6 7
@Compute @UseIncrementalOptions(IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE) public void myComputeFunction( @Input("/Users/admin/students_data") FoundryInput myInput, @Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) { ... }
If your transform is manually registered, add the property to the builder as in the following code block.
Copied!1 2 3 4 5 6
LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder() .computeFunctionInstance(new MyLowLevelManualFunction()) .putParameterToInputAlias("myInput", "/path/to/input/dataset") .putParameterToOutputAlias("myOutput", "/path/to/output/dataset") .useSchemaModificationType(true) .build();
The build related to the transformation will succeed or fail depending on how the transformation depends on the input dataset. More precisely, if the transformation depends on columns involved in the schema change, modification to those column will make the incremental transformation fail. In these cases, a new snapshot is required before being able to use incremental transforms again.
The transformation depends on a certain column if:
filter("eye = 'Brown'")
and the column eye
is renamed or deleted in the RAW dataset, then if we retrigger our FilterTransform
the incremental update will fail).hair
in our example RAW datasets, our FilterTransform
will fail).If the transformation doesn't depend on the schema changes, the incremental build will succeed.
For example, if we first add a select
statement for id
and eye
in our transformation and trigger an initial snapshot build from the RAW dataset, and then remove the column hair
in the RAW dataset, the incremental build will succeed and the schema change will not have any effect on the incremental transformation.
The build will also always succeed in case of additive changes to the schema (e.g. adding a new column).