Incremental Transforms Reference

Incremental decorator

Tip

The rest of this guide refers to incremental vs non-incremental builds. It is assumed that in all cases, the incremental() decorator is being used. Thus, this terminology just refers to whether or not the transform is actually run incrementally.

The incremental()decorator can be used to wrap a transform’s compute function with logic for enabling incremental computation:

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None students_df = students.dataframe('added') processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

The incremental() decorator can be used to wrap any existing transform that uses the transform(), transform_df(), or transform_pandas() decorator. Note that the compute function for your transform must support being run both incrementally and non-incrementally. The incremental() decorator does two key things:

  • It allows the transform to look up information about its previous build. Using this information, the incremental()decorator then decides whether or not the transform can be run incrementally according to the requirements described below.
  • It converts the input, output, and context objects into the incremental subclasses that provide additional functionality. Specifically, TransformInput becomes IncrementalTransformInput, TransformOutput becomes IncrementalTransformOutput, and TransformContext becomes IncrementalTransformContext. These incremental objects are then passed into the transform wrapped by the decorator.

The incremental decorator takes six arguments:

Copied!
1 2 3 4 5 6 7 8 transforms.api.incremental( require_incremental=False, semantic_version=1, snapshot_inputs=None, allow_retention=False, strict_append=False, v2_semantics=False )

Setting the require_incremental argument to True makes the transform fail if it cannot run incrementally. There are two cases where the transform is allowed to run as a snapshot, even if require_incremental=True:

  1. One of the outputs has never been built before.
  2. The semantic version has changed, meaning a snapshot was explicitly requested.

To debug the cause of a transform failing to run incrementally, look in the driver logs for warning transforms.api._incremental: Not running incrementally.

The semantic_version argument on the :func:~transforms.api.incremental decorator allows you to force the next run of the transform to be non-incremental.

  • If the semantic version of the current run is different than the semantic version of the previous run, the transform will run non-incrementally.
  • If not specified, the semantic version is set to 1.
  • If the semantic version of the previous run does not exist (e.g. when converting an existing transform to incremental transform), value 1 is assumed. This allows the transform to start running incrementally without requiring a new snapshot.
  • To force a subsequent run of the transform to be non-incremental, you can bump the semantic_version argument on the @incremental() decorator.
    • Note that when bumping semantic versions, only integers should be used.

The snapshot_inputs argument allows you to define some inputs as snapshot inputs which, unlike non-snapshot inputs, support update and delete modifications. See snapshot inputs for more.

Setting the allow_retention argument to True allows deletion of files in input and output datasets by Foundry Retention, while maintaining incrementality of your transform.

If the strict_append parameter is set to True and the input datasets are incremental, then the underlying Foundry transaction type is set to be an APPEND, and an APPEND transaction will be used for incremental writes. Trying to overwrite an existing file will lead to an exception. If the input datasets are not incremental, strict_append will run as SNAPSHOT. You should use require_incremental=True to ensure the code runs incrementally as APPEND. Trying to overwrite an existing file will succeed. Note that the write operation may not overwrite any files, even auxiliary ones such as Parquet summary metadata or Hadoop SUCCESS files. Incremental writes for all Foundry formats should support strict_append mode.

If the v2_semantics parameter is set to True, V2 incremental semantics will be used. There should be no difference in behavior between v2 and v1 incremental semantics, and we recommend all users set this to True. Non-Catalog input and output resources may only be read from/written to incrementally if using v2 semantics.

Important information

As mentioned above, the compute function for your transform wrapped with the incremental() decorator must support being run both incrementally and non-incrementally. Default read and write modes (explained in more detail throughout the rest of this page) can assist with this dual-logic requirement, but it may still be necessary to branch based on the is_incremental property of the compute context.

Another key point is that using the incremental() decorator with transform_df() or transform_pandas() only gives you access to the default read and write modes. This is sufficient if you have a transform where the added output rows are a function only of the added input rows (refer to the append example). If, however, your transform performs more complex logic (such as joins, aggregations, or distinct) that requires you to set the input read mode or the output write mode, then you should use the incremental() decorator with transform(). Using the incremental decorator with transform() allows you to set the read and write modes.

Warning

Note that the Code Repositories preview feature will always run transforms in non-incremental mode. This is true even when require_incremental=True is passed into the incremental() decorator.

Incremental modes of Inputs and Outputs

IncrementalTransformInput

The transforms.api.IncrementalTransformInput object extends the dataframe() method to take an optional read mode.

The optional input read mode parameter is only available if you’re using the transform()decorator. The transform_df() and transform_pandas() decorators call dataframe() and pandas() on the inputs, without any arguments, to extract the PySpark and Pandas DataFrame objects. This means the read mode used will always be the default added mode.

If you define a transform using the incremental decorator, the read modes behave differently depending on whether your transform is run incrementally or non-incrementally:

Read ModeIncremental BehaviorNon-incremental Behavior
added *Returns a DataFrame containing any new rows appended to the input since last time the transform ran.Returns a DataFrame containing the entire dataset since all rows are considered unseen.
previousReturns a DataFrame containing the entire input given to the transform the last time it ran.Returns an empty DataFrame.
currentReturns a DataFrame containing the entire input dataset for the current run.Returns a DataFrame containing the entire input dataset for the current run. This will be the same as added.

The default read mode is added.

There are instances where it is undesirable for an input to be treated in an incremental fashion despite the transform being marked as incremental(). See Snapshot Inputs for more information and how the read mode behavior differs for these types of inputs.

Note that the default output read mode is current, and the available output read modes are added, current, and previous. For more information about output read modes, refer to the section below.

The nature of incremental transforms means that we load all of the past transactions on the input datasets from the last SNAPSHOT transaction to build the input view. If you begin to see progressive slowness in your incremental transform, we recommend running a SNAPSHOT build on your incremental input datasets.

IncrementalTransformOutput

The transforms.api.IncrementalTransformOutput object provides access to read and write modes for the output dataset. The key to writing logic compatible with both incremental and non-incremental builds is the default write mode of modify. There are two write modes:

  • modify: This mode modifies the existing output with data written during the build. For example, calling write_dataframe() when the output is in modify mode will append the written DataFrame to the existing output.
  • replace: This mode fully replaces the output with data written during the build.

When we say a transform is run incrementally, it means that the default write mode for the output is set to modify. Similarly, when say a transform is run non-incrementally, it means that the default write mode for the output is set to replace.

Recall that the default read mode for input DataFrames is added. Because of the default input read mode of added and the default output write mode of modify, writing logic compatible with incremental and non-incremental builds becomes much easier:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # Read only the rows we haven't seen before. new_students_df = students.dataframe() # this is equivalent to students.dataframe('added') # When non-incremental, we read all rows and replace the output. # When incremental, we read only new rows, and append them to the output. processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )

There are more complex use cases for incremental computation when it might be required to compute the correct write mode and set it manually. This can be done using the set_mode() method on the incremental output.

The output write mode can only be set manually if you’re using the transform() decorator. With this decorator, you can use set_mode() before you explicitly call the write_dataframe() method to save the output. The transform_df() and transform_pandas() decorators, on the other hand, call write_dataframe() and write_pandas() to save the DataFrame output. This means the write mode used will be determined by the incremental() decorator.

Warning

When using set_mode(), it is worth ensuring that this is valid behavior both when the transform is run incrementally or non-incrementally. If this is not the case, you should make use of the is_incremental property.

In addition to the write mode, the transforms.api.IncrementalTransformOutput makes it possible to read DataFrames from the output dataset. This can be done using the dataframe() method, which again takes in an optional read mode. Default read mode is set to current and other available output read modes are added and previous. Read mode behaves differently depending on what the dataset’s write mode is set to.

Tip

Although default read mode is current, in most cases you actually want to use previous. Other read modes should be used to read dataset after writing to it.

Reading data from the previous run, valid combinations:

To read data from the previous output the transform must run in incremental mode (ctx.is_incremental is True), otherwise the dataframe will be empty.

Output Read ModeOutput Write ModeHas new data been written yet?Behavior
currentmodifyNodataframe() will return the previous output of the transform.
currentmodifyYesdataframe() will return the previous output of the transform plus data written to output in currently running build.
currentreplaceNoThese settings are invalid and may lead to unexpected behavior. If you want to merge and replace a previous output with a new input that has a potentially different schema, see Merge and replace with schema change Example.
currentreplaceYesdataframe() will return data written to output in currently running build.
addedmodify/replaceNoThere is no use case for these settings. Use previous mode instead.
addedmodify/replaceYesdataframe() will return data written to output in currently running build.
previousmodifyYes/Nodataframe() will return the previous output of the transform. Schema is required field when reading with previous.
previousreplaceYes/Nodataframe() will return the previous output of the transform. Schema is required field when reading with previous.
Warning

Note that the evaluation of read mode with the .dataframe() call is lazy (call-by-need), meaning that evaluation is delayed until the value is needed. The output of the read of the dataframe is evaluated according to the write mode of the dataset during the write_dataframe call, such that previous write modes are ignored. Calling .localCheckpoint(eager=True) forces the data to be read and the output write mode to be evaluated at that point, and never recomputed.

When using current to get the previous dataframe you don't have to provide schema. This is because current uses the schema of the output which must already have been built. However current mode is more fragile than previous. The current mode will fail if:

  • the transform is run non-incrementally and you don't override the write_mode to modify before calling dataframe on the output
  • the transform has never been computed before, so it’s not possible to construct an empty DataFrame because the schema is not known.

The schema ↗ you provide when reading the previous dataframe will be compared against the actual schema of the last output. If the column types, column nullability or order of the columns don't match an exception will be raised. To make sure the order of the columns stays the same use following construct:

Copied!
1 2 3 previous = out.dataframe('previous', schema) # schema is a pyspark.sql.types.StructType out.write_dataframe(df.select(schema.fieldNames()))

Foundry saves all columns as nullable, regardless of the schema used in your transform. As a result, your build will fail with a SchemaMismatchError when reading from the output in previous mode if you supply a schema with some fields set to non-nullable.

See the Merge and replace example for more information.

Reading data written in the current run, valid combinations:

Output Read ModeOutput Write ModeHas new data been written yet?
current or addedmodify / replaceYes

Prefer added as it makes your intentions clearer. A scenario that would benefit from reading the data written by the current transformation, is to run checks on the data and fail the build if the checks don't pass. This way we don't have to recompute data nor cache it in Spark to run the checks.

IncrementalTransformContext

Compared to the TransformContext object, the IncrementalTransformContext object provides an additional property: is_incremental. This property is set to True if the transform is run incrementally, this means:

  • the default output write mode is set to modify, and
  • the inputs default read mode is set to added.

Summary of incremental modes

The incremental decorator lets you access the previous inputs and outputs of the transform by specifying read mode “previous”. This way you can base the current build on historical context. If the transform is run in snapshot mode, the “previous” dataframes will be empty because this is the first run, or because the logic or data changed significantly necessitating a recompute.

However, the most common case is to use “added” mode for inputs and “modify” mode for outputs. These modes are used by default. They let you retrieve the newly added rows from an input dataset, processes them, and append them to an output dataset.

Instead of appending rows to the output, you may want to modify some existing rows already present in an output dataset. For that, use the “replace” mode as demonstrated in the examples for common scenarios.

Requirements for incremental computation

Let's analyze an incremental transform that filters students to only include students with brown hair:

Copied!
1 2 3 4 5 6 7 8 @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

Suppose the /examples/students_hair_eye_color input dataset is fully replaced with a new set of students. As we can see, appending the new set of students to the previous output results in an incorrect output dataset. This is an example of a situation where the incremental()decorator would decide to not run the transform incrementally. For a transform to be run incrementally, the following requirements must be met:

If a transform has the incremental() decorator but any of the above requirements are not met, the transform will automatically be run non-incrementally. This means the default output write mode will be set to replace instead of modify and inputs will be presented non-incrementally. It also means that reading from output in the transform will return an empty dataframe, because the previous history is not accessible. Similarly, inputs will also be presented non-incrementally. If we set require_incremental=True, the transform will fail rather than running non-incrementally.

It is often desirable to allow certain inputs to be fully rewritten without affecting the ability to run the transform incrementally. See Snapshot Inputs for more information.

Tip

It is possible to force a transform to only run incrementally (unless it has never been run before or the semantic version was bumped) with the require_incremental=True argument passed into the incremental decorator. If the transform cannot run incrementally it will deliberately fail rather than attempt to run non-incrementally.

Append-only input changes

A transform can be run incrementally if all its incremental inputs had only files added to them (via APPEND or UPDATE transactions) since the last run.

Conversely, a transform cannot be run incrementally if any of its incremental inputs

  • has been re-written fully, e.g. had SNAPSHOT transactions,
  • has had updated or deleted files, through UPDATE or DELETE transactions.

For instance, if the list of students in students_hair_eye_color completely changes, the previous output of filtered students is invalid and must be replaced.

Inputs with deletions coming from Foundry Retention

If an upstream dataset grows indefinitely and you want to be able to delete old rows (using Foundry Retention) without affecting incrementality of downstream computations, the incremental transform depending on that dataset must be explicitly set to allow retained input. This can be done by using the allow_retention argument of the transforms.api.incremental decorator.

  • If this field is set to True, all deletions coming from Foundry Retention will be ignored when evaluating if the inputs preserve incrementality. This means that removed inputs coming from Retention will not compromise incrementality, and that if the only non-added inputs are inputs with retained rows, the transform will still run incrementally.
  • If the field is False (default), any removed-type changes in the input dataset will cause the transform to run a snapshot.
Copied!
1 2 3 4 5 6 7 8 @incremental(allow_retention=True) @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

In the above example, if the transform is run after a set of changes in the dataset /examples/students_hair_eye_color that includes only added changes and removed changes made using Foundry Retention, the transform will run incrementally. If any removed changes made in other ways or any modified changes are present, a snapshot will be triggered.

Warning

Specifying allow_retention=True only prevents effects on incrementality from removed changes that come from Foundry Retention. Any other delete in the input dataset would still cause the transform to run a snapshot instead of incremental computation.

Snapshot inputs

There are scenarios in which it is allowed for inputs to be fully rewritten without invalidating the incrementality of the transform. For example, suppose you have a simple reference dataset that maps phone number country code to country and this is periodically rewritten. Changes to this dataset do not necessarily invalidate the results of any previous computation and therefore should not prevent the transform being run incrementally.

By default, as described above, a transform cannot be run incrementally if any input has been fully rewritten since the transform was last run. Snapshot inputs are excluded from this check and their start transaction allowed to differ between runs.

Snapshot inputs can be configured by using the snapshot_inputs argument on the incremental() decorator.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # this will be all unseen phone numbers since the previous run phone_numbers = phone_numbers.dataframe() # this will be all country codes, regardless of what has been seen previously country_codes = country_codes.dataframe() cond = [phone_numbers.country_code == country_codes.code] output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))

The behavior of snapshot inputs are identical when a transform runs incrementally or non-incrementally. As such, added and current read modes will always return the entire dataset. All other read modes will return the empty dataset.

Given that there are no constraints around previously-seen versions of snapshot inputs, it is possible to add or remove snapshot inputs while retaining the ability to run the transform incrementally. Remember that if the modification of the inputs fundamentally changes the semantics of the transform, it is worth reviewing whether the semantic_version argument on the incremental() decorator should be updated.

Changes to Inputs

The list of existing inputs can be modified. Incrementality will be preserved in the case where either:

  • New inputs or new snapshot inputs are added, or
  • Existing inputs or existing snapshot inputs are removed. Note that an incremental transform must have at least one input.

We also have a requirement that the start transactions for each of the non-snapshot input datasets are consistent with those used for the previous run.

Outputs last built by same transform

For multi-output incremental transforms, the last committed transaction on each of the outputs must have been generated from the same transform.

Summary of requirements for incremental computation

A transform can be run incrementally if and only if all its incremental inputs only had files appended to them, or where files were deleted, those files were deleted using Foundry Retention with allow_retention=True. Snapshot inputs are excluded from this check.