The rest of this guide refers to incremental vs non-incremental builds. It is assumed that in all cases, the incremental()
decorator is being used. Thus, this terminology just refers to whether or not the transform is actually run incrementally.
The incremental()
decorator can be used to wrap a transform’s compute function with logic for enabling incremental computation:
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None students_df = students.dataframe('added') processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
The incremental()
decorator can be used to wrap any existing transform that uses the transform()
, transform_df()
, or transform_pandas()
decorator. Note that the compute function for your transform must support being run both incrementally and non-incrementally. The incremental()
decorator does two key things:
incremental()
decorator then decides whether or not the transform can be run incrementally according to the requirements described below.TransformInput
becomes IncrementalTransformInput
, TransformOutput
becomes IncrementalTransformOutput
, and TransformContext
becomes IncrementalTransformContext
. These incremental objects are then passed into the transform wrapped by the decorator.The incremental decorator takes six arguments:
Copied!1 2 3 4 5 6 7 8
transforms.api.incremental( require_incremental=False, semantic_version=1, snapshot_inputs=None, allow_retention=False, strict_append=False, v2_semantics=False )
Setting the require_incremental
argument to True
makes the transform fail if it cannot run incrementally. There are two cases where the transform is allowed to run as a snapshot, even if require_incremental=True
:
To debug the cause of a transform failing to run incrementally, look in the driver logs for warning transforms.api._incremental: Not running incrementally
.
The semantic_version
argument on the :func:~transforms.api.incremental
decorator allows you to force the next run of the transform to be non-incremental.
semantic_version
argument on the @incremental()
decorator.
The snapshot_inputs
argument allows you to define some inputs as snapshot inputs which, unlike non-snapshot inputs, support update and delete modifications. See snapshot inputs for more.
Setting the allow_retention
argument to True
allows deletion of files in input and output datasets by Foundry Retention, while maintaining incrementality of your transform.
If the strict_append
parameter is set to True
, the underlying Foundry transaction type is set to be an APPEND
, and an APPEND
transaction will be used for incremental writes. Note that the write operation may not overwrite any files, even auxiliary ones such as Parquet summary metadata or Hadoop SUCCESS files. Incremental writes for all Foundry formats should support this mode.
If the v2_semantics
parameter is set to True
, V2 incremental semantics will be used. There should be no difference in behavior between v2 and v1 incremental semantics, and we recommend all users set this to True
. Non-Catalog input and output resources may only be read from/written to incrementally if using v2 semantics.
As mentioned above, the compute function for your transform wrapped with the incremental()
decorator must support being run both incrementally and non-incrementally. Default read and write modes (explained in more detail throughout the rest of this page) can assist with this dual-logic requirement, but it may still be necessary to branch based on the is_incremental
property of the compute context.
Another key point is that using the incremental()
decorator with transform_df()
or transform_pandas()
only gives you access to the default read and write modes. This is sufficient if you have a transform where the added output rows are a function only of the added input rows (refer to the append example). If, however, your transform performs more complex logic (such as joins, aggregations, or distinct) that requires you to set the input read mode or the output write mode, then you should use the incremental()
decorator with transform()
. Using the incremental decorator with transform()
allows you to set the read and write modes.
Note that the Code Repositories preview feature will always run transforms in non-incremental mode. This is true even when require_incremental=True
is passed into the incremental()
decorator.
The transforms.api.IncrementalTransformInput
object extends the dataframe()
method to take an optional read mode.
The optional input read mode parameter is only available if you’re using the transform()
decorator. The transform_df()
and transform_pandas()
decorators call dataframe()
and pandas()
on the inputs, without any arguments, to extract the PySpark and Pandas DataFrame objects. This means the read mode used will always be the default added
mode.
If you define a transform using the incremental decorator, the read modes behave differently depending on whether your transform is run incrementally or non-incrementally:
Read Mode | Incremental Behavior | Non-incremental Behavior |
---|---|---|
added * | Returns a DataFrame ↗ containing any new rows appended to the input since last time the transform ran. | Returns a DataFrame ↗ containing the entire dataset since all rows are considered unseen. |
previous | Returns a DataFrame ↗ containing the entire input given to the transform the last time it ran. | Returns an empty DataFrame ↗. |
current | Returns a DataFrame ↗ containing the entire input dataset for the current run. | Returns a DataFrame ↗ containing the entire input dataset for the current run. This will be the same as added . |
The default read mode is added
.
There are instances where it is undesirable for an input to be treated in an incremental fashion despite the transform being marked as incremental()
. See Snapshot Inputs for more information and how the read mode behavior differs for these types of inputs.
Note that the default output read mode is current
, and the available output read modes are added
, current
, and previous
. For more information about output read modes, refer to the section below.
The nature of incremental transforms means that we load all of the past transactions on the input datasets from the last SNAPSHOT
transaction to build the input view. If you begin to see progressive slowness in your incremental transform, we recommend running a SNAPSHOT build on your incremental input datasets.
The transforms.api.IncrementalTransformOutput
object provides access to read and write modes for the output dataset. The key to writing logic compatible with both incremental and non-incremental builds is the default write mode of modify
. There are two write modes:
modify
: This mode modifies the existing output with data written during the build. For example, calling write_dataframe()
when the output is in modify
mode will append the written DataFrame
↗ to the existing output.replace
: This mode fully replaces the output with data written during the build.When we say a transform is run incrementally, it means that the default write mode for the output is set to modify
. Similarly, when say a transform is run non-incrementally, it means that the default write mode for the output is set to replace
.
Recall that the default read mode for input DataFrames is added
. Because of the default input read mode of added
and the default output write mode of modify
, writing logic compatible with incremental and non-incremental builds becomes much easier:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # Read only the rows we haven't seen before. new_students_df = students.dataframe() # this is equivalent to students.dataframe('added') # When non-incremental, we read all rows and replace the output. # When incremental, we read only new rows, and append them to the output. processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )
There are more complex use cases for incremental computation when it might be required to compute the correct write mode and set it manually. This can be done using the set_mode()
method on the incremental output.
The output write mode can only be set manually if you’re using the transform()
decorator. With this decorator, you can use set_mode()
before you explicitly call the write_dataframe()
method to save the output. The transform_df()
and transform_pandas()
decorators, on the other hand, call write_dataframe()
and write_pandas()
to save the DataFrame output. This means the write mode used will be determined by the incremental()
decorator.
When using set_mode()
, it is worth ensuring that this is valid behavior both when the transform is run incrementally or non-incrementally. If this is not the case, you should make use of the is_incremental
property.
In addition to the write mode, the transforms.api.IncrementalTransformOutput
makes it possible to read DataFrames from the output dataset. This can be done using the dataframe()
method, which again takes in an optional read mode. Default read mode is set to current
and other available output read modes are added
and previous
. Read mode behaves differently depending on what the dataset’s write mode is set to.
Although default read mode is current
, in most cases you actually want to use previous
. Other read modes should be used to read dataset after writing to it.
To read data from the previous output the transform must run in incremental mode (ctx.is_incremental is True
), otherwise the dataframe will be empty.
Output Read Mode | Output Write Mode | Has new data been written yet? | Behavior |
---|---|---|---|
current | modify | No | dataframe() will return the previous output of the transform. |
current | modify | Yes | dataframe() will return the previous output of the transform plus data written to output in currently running build. |
current | replace | No | These settings are invalid and may lead to unexpected behavior. If you want to merge and replace a previous output with a new input that has a potentially different schema, see Merge and replace with schema change Example. |
current | replace | Yes | dataframe() will return data written to output in currently running build. |
added | modify /replace | No | There is no use case for these settings. Use previous mode instead. |
added | modify /replace | Yes | dataframe() will return data written to output in currently running build. |
previous | modify | Yes/No | dataframe() will return the previous output of the transform. Schema is required field when reading with previous. |
previous | replace | Yes/No | dataframe() will return the previous output of the transform. Schema is required field when reading with previous. |
Note that the evaluation of read mode with the .dataframe()
call is lazy (call-by-need), meaning that evaluation is delayed until the value is needed. The output of the read of the dataframe is evaluated according to the write mode of the dataset during the write_dataframe
call, such that previous write modes are ignored. Calling .localCheckpoint(eager=True)
forces the data to be read and the output write mode to be evaluated at that point, and never recomputed.
When using current
to get the previous dataframe you don't have to provide schema
. This is because current uses the schema of the output which must already have been built.
However current
mode is more fragile than previous
. The current
mode will fail if:
write_mode
to modify
before calling dataframe
on the outputDataFrame
↗ because the schema is not known.The schema ↗ you provide when reading the previous dataframe will be compared against the actual schema of the last output. If the column types, column nullability or order of the columns don't match an exception will be raised. To make sure the order of the columns stays the same use following construct:
Copied!1 2 3
previous = out.dataframe('previous', schema) # schema is a pyspark.sql.types.StructType out.write_dataframe(df.select(schema.fieldNames()))
Foundry saves all columns as nullable, regardless of the schema used in your transform. As a result, your build will fail with a SchemaMismatchError
when reading from the output in previous
mode if you supply a schema with some fields set to non-nullable.
See the Merge and replace example for more information.
Output Read Mode | Output Write Mode | Has new data been written yet? |
---|---|---|
current or added | modify / replace | Yes |
Prefer added
as it makes your intentions clearer.
A scenario that would benefit from reading the data written by the current transformation, is to run checks on the data and fail the build if the checks don't pass. This way we don't have to recompute data nor cache it in Spark to run the checks.
Compared to the TransformContext
object, the IncrementalTransformContext
object provides an additional property: is_incremental
. This property is set to True
if the transform is run incrementally, this means:
modify
, andadded
.The incremental decorator lets you access the previous inputs and outputs of the transform by specifying read mode “previous”. This way you can base the current build on historical context. If the transform is run in snapshot mode, the “previous” dataframes will be empty because this is the first run, or because the logic or data changed significantly necessitating a recompute.
However, the most common case is to use “added” mode for inputs and “modify” mode for outputs. These modes are used by default. They let you retrieve the newly added rows from an input dataset, processes them, and append them to an output dataset.
Instead of appending rows to the output, you may want to modify some existing rows already present in an output dataset. For that, use the “replace” mode as demonstrated in the examples for common scenarios.
Let's analyze an incremental transform that filters students to only include students with brown hair:
Copied!1 2 3 4 5 6 7 8
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
Suppose the /examples/students_hair_eye_color
input dataset is fully replaced with a new set of students. As we can see, appending the new set of students to the previous output results in an incorrect output dataset. This is an example of a situation where the incremental()
decorator would decide to not run the transform incrementally.
For a transform to be run incrementally, the following requirements must be met:
If a transform has the incremental()
decorator but any of the above requirements are not met, the transform will automatically be run non-incrementally.
This means the default output write mode will be set to replace
instead of modify
and inputs will be presented non-incrementally. It also means that reading from output in the transform will return an empty dataframe, because the previous history is not accessible. Similarly, inputs will also be presented non-incrementally. If we set require_incremental=True
, the transform will fail rather than running non-incrementally.
It is often desirable to allow certain inputs to be fully rewritten without affecting the ability to run the transform incrementally. See Snapshot Inputs for more information.
It is possible to force a transform to only run incrementally (unless it has never been run before or the semantic version was bumped) with the require_incremental=True
argument passed into the incremental
decorator. If the transform cannot run incrementally it will deliberately fail rather than attempt to run non-incrementally.
A transform can be run incrementally if all its incremental inputs had only files added to them (via APPEND
or UPDATE
transactions) since the last run.
Conversely, a transform cannot be run incrementally if any of its incremental inputs
SNAPSHOT
transactions,UPDATE
or DELETE
transactions.For instance, if the list of students in students_hair_eye_color
completely changes, the previous output of filtered students is invalid and must be replaced.
If an upstream dataset grows indefinitely and you want to be able to delete old rows (using Foundry Retention) without affecting incrementality of downstream computations, the incremental transform depending on that dataset must be explicitly set to allow retained input. This can be done by using the allow_retention
argument of the transforms.api.incremental
decorator.
True
, all deletions coming from Foundry Retention will be ignored when evaluating if the inputs preserve incrementality. This means that removed
inputs coming from Retention will not compromise incrementality, and that if the only non-added
inputs are inputs with retained rows, the transform will still run incrementally.False
(default), any removed
-type changes in the input dataset will cause the transform to run a snapshot.Copied!1 2 3 4 5 6 7 8
@incremental(allow_retention=True) @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
In the above example, if the transform is run after a set of changes in the dataset /examples/students_hair_eye_color
that includes only added
changes and removed
changes made using Foundry Retention, the transform will run incrementally. If any removed
changes made in other ways or any modified
changes are present, a snapshot will be triggered.
Specifying allow_retention=True
only prevents effects on incrementality from removed
changes that come from Foundry Retention. Any other delete in the input dataset would still cause the transform to run a snapshot instead of incremental computation.
There are scenarios in which it is allowed for inputs to be fully rewritten without invalidating the incrementality of the transform. For example, suppose you have a simple reference dataset that maps phone number country code to country and this is periodically rewritten. Changes to this dataset do not necessarily invalidate the results of any previous computation and therefore should not prevent the transform being run incrementally.
By default, as described above, a transform cannot be run incrementally if any input has been fully rewritten since the transform was last run. Snapshot inputs are excluded from this check and their start transaction allowed to differ between runs.
Snapshot inputs can be configured by using the snapshot_inputs
argument on the incremental()
decorator.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
@incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # this will be all unseen phone numbers since the previous run phone_numbers = phone_numbers.dataframe() # this will be all country codes, regardless of what has been seen previously country_codes = country_codes.dataframe() cond = [phone_numbers.country_code == country_codes.code] output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))
The behavior of snapshot inputs are identical when a transform runs incrementally or non-incrementally. As such, added
and current
read modes will always return the entire dataset. All other read modes will return the empty dataset.
Given that there are no constraints around previously-seen versions of snapshot inputs, it is possible to add or remove snapshot inputs while retaining the ability to run the transform incrementally. Remember that if the modification of the inputs fundamentally changes the semantics of the transform, it is worth reviewing whether the semantic_version
argument on the incremental()
decorator should be updated.
The list of existing inputs can be modified. Incrementality will be preserved in the case where either:
We also have a requirement that the start transactions for each of the non-snapshot input datasets are consistent with those used for the previous run.
For multi-output incremental transforms, the last committed transaction on each of the outputs must have been generated from the same transform.
A transform can be run incrementally if and only if all its incremental inputs only had files appended to them, or where files were deleted, those files were deleted using Foundry Retention with allow_retention=True
. Snapshot inputs are excluded from this check.