This section contains a wide range of examples of incrementally computable transforms:
The examples make use of two inputs to demonstrate incremental computation: students
and students_updated
. The students
input contains 3 students and is not incremental. This means it has no history:
>>> students.dataframe('previous').sort('id').show()
+---+----+---+---+
| id|hair|eye|sex|
+---+----+---+---+
+---+----+---+---+
>>>
>>> students.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students.dataframe('added') is students.dataframe()
True
The students_updated
input is the same as students
but with an additional update that contains three extra students. This update makes the input incremental. Therefore, it has a non-empty previous
DataFrame.
>>> students_updated.dataframe('previous').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students_updated.dataframe('added') is students_updated.dataframe()
True
An append-only incremental computation is one where the added output rows are a function only of the added input rows. This means that to compute its output, the transform does the following:
Changing column types, formatting dates as strings, and filtering are all examples of append-only computations. In these examples, each added input row is transformed or deleted to generate the output rows.
Notice that the only difference to make an append-only transform incremental is the incremental()
decoration.
When running incrementally, the default read mode of added
means the transform reads only the new students, and the default write mode of modify
means the transform appends only the filtered new students to the output.
When running non-incrementally, the default read mode of added
means the transform reads the full input, and the default write mode of replace
means the transform replaces the output with the full set of filtered students.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): new_students_df = students.dataframe() processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )
Sometimes a transform needs to refer to its previous output in order to incrementally compute an update. An example of this is the distinct()
↗ method.
To remove duplicate rows in a transform (assuming the current output is correct), the transform must de-duplicate any new rows in the input, and then check those rows do not already exist in the output.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_distinct(students, processed): new_students_df = students.dataframe() processed.write_dataframe( new_students_df.distinct().subtract( processed.dataframe('previous', schema=new_students_df.schema) ) )
Here we make use of the previous
read mode on the output dataset. This returns the DataFrame
↗ that was output during the last build. Since it is possible that there is no previous
output, we have to provide a schema to the dataframe('previous')
call so that an empty DataFrame can be correctly constructed.
When running this code, the schema of the output dataset will be automatically inferred from the data. This includes automatically detecting the name of columns, their type, their "nullability" (see StructField
↗), and the order of the columns. To ensure reliability of your build, it is best practice to hardcode the expected schema of your dataframe instead of relying on Spark inference.
There are some transformations that always replace their entire output. Yet often, these transforms can still benefit from incremental computation. One such example is aggregating statistics. For example, counting the number of times each distinct value occurs in a column.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
from pyspark.sql import functions as F @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_group_by(students, processed): # Compute the hair color counts for only the new students. new_hair_counts_df = students.dataframe().groupBy('hair').count() # Union with the old counts out_schema = new_hair_counts_df.schema all_counts_df = new_hair_counts_df.union( processed.dataframe('previous', schema=out_schema) ) # Group by hair color, summing the two sets of counts. totals_df = all_counts_df.groupBy('hair').agg(F.sum('count').alias('count')) # To fully replace the output, we always set the output mode to 'replace'. # Checkpoint the totals dataframe before changing the output mode. totals_df.localCheckpoint(eager=True) processed.set_mode('replace') processed.write_dataframe(totals_df.select(out_schema.fieldNames()))
Again, since it is possible that there is no previous
output, we have to provide a schema to the dataframe('previous')
call so that an empty DataFrame can be correctly constructed.
In some cases it is required to incrementally update a dataset with an input whose schema may change. One example is when the input comes from a source table where columns may be added or removed over time. In this case we need to read the previous output with its old schema and then manually reconcile it with the new schema.
We still need to define how the transform should behave when the it runs in SNAPSHOT
mode, meaning there is no previous output. In this case, calling processed.dataframe('current')
would fail.
The example below shows how to add new students
into the existing students_processed
dataset, even when the new students
dataset's schema is different.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
from pyspark.sql import functions as F @incremental( snapshot_inputs=['students'] ) @transform( students=Input('/examples/students_raw'), # The schema of this dataset may change. Note this means it must be a SNAPSHOT transaction, so we include it in the "snapshot_inputs" parameter of the @incremental decorator processed=Output('/examples/students_processed') ) def incremental_group_by(students, processed, ctx): if not ctx.is_incremental: # This case needs to be handled independently # ... return # Read the old processed dataframe with its associated schema # As we haven't written to processed yet, 'current' will give us the previous transaction # Note: It is important here that the write mode of 'processed' is still 'modify' at this point # in the code path (See warning below) students_previous = processed.dataframe('current').localCheckpoint() # Merge the old and new dataframes, setting missing columns to null students_all = students_previous.unionByName(students.dataframe(), allowMissingColumns=True) # Write out the new combined dataframe processed.set_mode('replace') processed.write_dataframe(students_all)
Note that the evaluation of read mode with the .dataframe()
call is lazy (call-by-need), meaning that evaluation is delayed until the value is needed. The output of the read of the dataframe is evaluated according to the write mode of the dataset during the write_dataframe
call, such that previous write modes are ignored. Calling .localCheckpoint(eager=True)
forces the data to be read and the output write mode to be evaluated at that point, and never recomputed.
Let's assume you have two tables - Orders
submitted by the customer and Deliveries
that have been completed - and we would like to compute a table DeliveryDuration
with the time it takes for items to be delivered. Even though both Orders
and Deliveries
tables will only get new rows appended and no rows will ever be modified, a simple join between the two incremental datasets will not work. For example, the Orders
table might contain orderIds
that are not yet present in the Deliveries
table.
Orders: Deliveries:
+---------+---------------+ +---------+--------------+ +---------+------------------+
| orderId | submittedDate | | orderId | deliveryDate | | orderId | deliveryDuration |
+---------+---------------+ +---------+--------------+ ----> +---------+------------------+
| 1001 | 2019-08-21 | join on | 1001 | 2019-08-23 | | 1001 | 2 |
+---------+---------------+ orderId +---------+--------------+ +---------+------------------+
| 1002 | 2019-08-22 |
+---------+---------------+
| 1003 | 2019-08-23 |
+---------+---------------+
Assuming orderId
is stricly increasing in both Orders
and Deliveries
tables, we can check what has been the last orderId
that we computed deliveryDuration
for (maxComputedOrderId
) and only get the rows from Orders
and Deliveries
tables with orderId
bigger than maxComputedOrderId
:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F @incremental(snapshot_inputs=['orders', 'deliveries']) @transform( orders=Input('/example/Orders'), deliveries=Input('/example/Deliveries'), delivery_duration=Output('/example/New_Delivery_Date') ) def compute_delivery_duration(orders, deliveries, delivery_duration): def to_fields(datatype, names, nullable=True): return [T.StructField(n, datatype, nullable) for n in names] # Generate a schema to pass for deliveryDuration fields = to_fields(T.IntegerType(), ['orderId', 'deliveryDuration']) # Explicitly define the schema as you can't refer to the previous version schema maxComputedOrderId = ( delivery_duration .dataframe('previous', schema=T.StructType(fields)) .groupby() .max('orderId') .collect()[0][0] ) # At first iteration, maxComputedOrderId is empty because delivery_duration dataset doesn't exist yet if maxComputedOrderId == None: maxComputedOrderId = 0 ordersNotProcessed = orders.dataframe().filter(F.col('orderId') > maxComputedOrderId) deliveriesNotProcessed = deliveries.dataframe().filter(F.col('orderId') > maxComputedOrderId) newDurations = ( ordersNotProcessed .join(deliveriesNotProcessed, 'orderId', how='left') .withColumn('deliveryDuration', F.datediff(F.col('deliveryDate'), F.col('submittedDate'))) .drop(*['submittedDate', 'deliveryDate']) ) delivery_duration.write_dataframe(newDurations)
Let’s say we would like to add another column to our incremental dataset from now on. Adding another column to the output won’t invalidate the is_incremental
flag, so the next run will compute the new rows and write the data with a new column and this column will be null in all the rows written previously.
However, we might want to populate the column for previous rows as well. Increasing the semantic_version
of the transform will make it run non-incrementally once, and if you are using read mode of “added”, the input will contain all the data enabling you to recompute it and add the new column.
If your transform has been creating a historical dataset from snapshot input, then it becomes slightly more complicated, as the previous data is in a stack of snapshot transactions on your input. In this case, contact your Palantir representative.
In this example, we discussed adding a new column, but the above reasoning applies to all sorts of logic changes.
Creating a new branch and running a build on it, will run the build incrementally. Simply the last transaction commited on the original branch you created your branch based off will be seen as the previous transaction for the first build on the new branch.
We saw how to process data incrementally by:
We also explored how to:
To understand incremental errors, it is easier — and sometimes necessary — to have read the concepts of transactions and dataset views.
When a job runs incrementally, its incremental input datasets only consist of the unprocessed transactions range, not the full dataset view.
Imagine the following transaction history for a dataset:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
|
Last processed transaction
The last time the dataset was built, the latest transaction was (3). Since then, transactions (4) and (5) have been committed, therefore the unprocessed transaction range is (4) — (5).
The dataset view is the transaction range (1) — (5). The transaction “on top” of the view (the most recent) is sometimes referred to as the branch’s HEAD (again by analogy with git). Like in git a branch is a pointer to a transaction, so we say that the branch points at transaction (5). Several branches can point at several transactions, and branches might share a transaction history:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5) [develop]
|
└─> UPDATE [feature-branch]
Catalog:TransactionsNotInView
In order for the job to run incrementally, a series of checks is run at the beginning of the job. One of these checks verifies that the unprocessed transactions range is strictly incremental (i.e., append-only file changes, see requirements for incremental computation). It does so by comparing the files in the unprocessed transactions range, and the processed transactions range.
However if the branch’s HEAD has been moved, the incremental job is now in an inconsistent state: it no longer make sense to compare both ranges, so an error Catalog:TransactionNotInView
is thrown.
See the below for a diagram of how this error can occur:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5)
| (last processed (branch's previous
| transaction) HEAD, now orphan)
|
└─> UPDATE (6) --> UPDATE (7, branch's current HEAD)
Here the processed transaction range is (1) — (5), the current branch's HEAD points at (7), and the current view consists of transactions (1), (2), (6), and (7).
This is an inconsistent state because not all processed transactions are upstream of the branch’s HEAD: indeed (3) is not. In other words, the previous HEAD (3) no longer is part of the current view, hence why Catalog:TransactionNotInView
is thrown.
Catalog:InconsistentBranchesOrder
The other Catalog error that can be thrown is Catalog:InconsistentBranchesOrder
, when the last processed transaction (prevTransaction
) is greater than the branch HEAD (endTransaction
). This can happen if the HEAD of the dataset is moved to a transaction before the previous transaction.
See the below for a diagram of how this error can occur:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
| |
Current HEAD Last processed transaction
A branch’s HEAD can change for two reasons:
SNAPSHOT
, so they cannot lead to an inconsistent state.In order to remediate this, you will need to either:
updateBranch2
endpoint with the latest processed transaction as the parentRef. Note that we only recommend the use of this endpoint for experienced users.