Examples

This section contains a wide range of examples of incrementally computable transforms:

The examples make use of two inputs to demonstrate incremental computation: students and students_updated. The students input contains 3 students and is not incremental. This means it has no history:

>>> students.dataframe('previous').sort('id').show()
+---+----+---+---+
| id|hair|eye|sex|
+---+----+---+---+
+---+----+---+---+
>>>
>>> students.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students.dataframe('added') is students.dataframe()
True

The students_updated input is the same as students but with an additional update that contains three extra students. This update makes the input incremental. Therefore, it has a non-empty previous DataFrame.

>>> students_updated.dataframe('previous').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
|  4|Brown|Green|Female|
|  5|Brown| Blue|  Male|
|  6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  4|Brown|Green|Female|
|  5|Brown| Blue|  Male|
|  6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students_updated.dataframe('added') is students_updated.dataframe()
True

Append

An append-only incremental computation is one where the added output rows are a function only of the added input rows. This means that to compute its output, the transform does the following:

  • Looks at any newly added input data,
  • Computes any new output rows–which are a function only of these added input rows, and
  • Appends the new output to the existing output.

Changing column types, formatting dates as strings, and filtering are all examples of append-only computations. In these examples, each added input row is transformed or deleted to generate the output rows.

Notice that the only difference to make an append-only transform incremental is the incremental() decoration.

When running incrementally, the default read mode of added means the transform reads only the new students, and the default write mode of modify means the transform appends only the filtered new students to the output.

When running non-incrementally, the default read mode of added means the transform reads the full input, and the default write mode of replace means the transform replaces the output with the full set of filtered students.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): new_students_df = students.dataframe() processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )

Merge and append

Sometimes a transform needs to refer to its previous output in order to incrementally compute an update. An example of this is the distinct() method. To remove duplicate rows in a transform (assuming the current output is correct), the transform must de-duplicate any new rows in the input, and then check those rows do not already exist in the output.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_distinct(students, processed): new_students_df = students.dataframe() processed.write_dataframe( new_students_df.distinct().subtract( processed.dataframe('previous', schema=new_students_df.schema) ) )

Here we make use of the previous read mode on the output dataset. This returns the DataFrame that was output during the last build. Since it is possible that there is no previous output, we have to provide a schema to the dataframe('previous') call so that an empty DataFrame can be correctly constructed.

When running this code, the schema of the output dataset will be automatically inferred from the data. This includes automatically detecting the name of columns, their type, their "nullability" (see StructField), and the order of the columns. To ensure reliability of your build, it is best practice to hardcode the expected schema of your dataframe instead of relying on Spark inference.

Merge and replace

There are some transformations that always replace their entire output. Yet often, these transforms can still benefit from incremental computation. One such example is aggregating statistics. For example, counting the number of times each distinct value occurs in a column.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from pyspark.sql import functions as F @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_group_by(students, processed): # Compute the hair color counts for only the new students. new_hair_counts_df = students.dataframe().groupBy('hair').count() # Union with the old counts out_schema = new_hair_counts_df.schema all_counts_df = new_hair_counts_df.union( processed.dataframe('previous', schema=out_schema) ) # Group by hair color, summing the two sets of counts. totals_df = all_counts_df.groupBy('hair').agg(F.sum('count').alias('count')) # To fully replace the output, we always set the output mode to 'replace'. # Checkpoint the totals dataframe before changing the output mode. totals_df.localCheckpoint(eager=True) processed.set_mode('replace') processed.write_dataframe(totals_df.select(out_schema.fieldNames()))

Again, since it is possible that there is no previous output, we have to provide a schema to the dataframe('previous') call so that an empty DataFrame can be correctly constructed.

Merge and replace with schema change

In some cases it is required to incrementally update a dataset with an input whose schema may change. One example is when the input comes from a source table where columns may be added or removed over time. In this case we need to read the previous output with its old schema and then manually reconcile it with the new schema.

We still need to define how the transform should behave when the it runs in SNAPSHOT mode, meaning there is no previous output. In this case, calling processed.dataframe('current') would fail.

The example below shows how to add new students into the existing students_processed dataset, even when the new students dataset's schema is different.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from pyspark.sql import functions as F @incremental( snapshot_inputs=['students'] ) @transform( students=Input('/examples/students_raw'), # The schema of this dataset may change. Note this means it must be a SNAPSHOT transaction, so we include it in the "snapshot_inputs" parameter of the @incremental decorator processed=Output('/examples/students_processed') ) def incremental_group_by(students, processed, ctx): if not ctx.is_incremental: # This case needs to be handled independently # ... return # Read the old processed dataframe with its associated schema # As we haven't written to processed yet, 'current' will give us the previous transaction # Note: It is important here that the write mode of 'processed' is still 'modify' at this point # in the code path (See warning below) students_previous = processed.dataframe('current').localCheckpoint() # Merge the old and new dataframes, setting missing columns to null students_all = students_previous.unionByName(students.dataframe(), allowMissingColumns=True) # Write out the new combined dataframe processed.set_mode('replace') processed.write_dataframe(students_all)
Warning

Note that the evaluation of read mode with the .dataframe() call is lazy (call-by-need), meaning that evaluation is delayed until the value is needed. The output of the read of the dataframe is evaluated according to the write mode of the dataset during the write_dataframe call, such that previous write modes are ignored. Calling .localCheckpoint(eager=True) forces the data to be read and the output write mode to be evaluated at that point, and never recomputed.

Leveraging incremental transforms to join large datasets

Let's assume you have two tables - Orders submitted by the customer and Deliveries that have been completed - and we would like to compute a table DeliveryDuration with the time it takes for items to be delivered. Even though both Orders and Deliveries tables will only get new rows appended and no rows will ever be modified, a simple join between the two incremental datasets will not work. For example, the Orders table might contain orderIds that are not yet present in the Deliveries table.

Orders:                               Deliveries:
+---------+---------------+           +---------+--------------+           +---------+------------------+
| orderId | submittedDate |           | orderId | deliveryDate |           | orderId | deliveryDuration |
+---------+---------------+           +---------+--------------+   ---->   +---------+------------------+
| 1001    | 2019-08-21    |  join on  | 1001    | 2019-08-23   |           | 1001    | 2                |
+---------+---------------+  orderId  +---------+--------------+           +---------+------------------+
| 1002    | 2019-08-22    |
+---------+---------------+
| 1003    | 2019-08-23    |
+---------+---------------+

Assuming orderId is stricly increasing in both Orders and Deliveries tables, we can check what has been the last orderId that we computed deliveryDuration for (maxComputedOrderId) and only get the rows from Orders and Deliveries tables with orderId bigger than maxComputedOrderId:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F @incremental(snapshot_inputs=['orders', 'deliveries']) @transform( orders=Input('/example/Orders'), deliveries=Input('/example/Deliveries'), delivery_duration=Output('/example/New_Delivery_Date') ) def compute_delivery_duration(orders, deliveries, delivery_duration): def to_fields(datatype, names, nullable=True): return [T.StructField(n, datatype, nullable) for n in names] # Generate a schema to pass for deliveryDuration fields = to_fields(T.IntegerType(), ['orderId', 'deliveryDuration']) # Explicitly define the schema as you can't refer to the previous version schema maxComputedOrderId = ( delivery_duration .dataframe('previous', schema=T.StructType(fields)) .groupby() .max('orderId') .collect()[0][0] ) # At first iteration, maxComputedOrderId is empty because delivery_duration dataset doesn't exist yet if maxComputedOrderId == None: maxComputedOrderId = 0 ordersNotProcessed = orders.dataframe().filter(F.col('orderId') > maxComputedOrderId) deliveriesNotProcessed = deliveries.dataframe().filter(F.col('orderId') > maxComputedOrderId) newDurations = ( ordersNotProcessed .join(deliveriesNotProcessed, 'orderId', how='left') .withColumn('deliveryDuration', F.datediff(F.col('deliveryDate'), F.col('submittedDate'))) .drop(*['submittedDate', 'deliveryDate']) ) delivery_duration.write_dataframe(newDurations)

Handling schema or logic changes

Let’s say we would like to add another column to our incremental dataset from now on. Adding another column to the output won’t invalidate the is_incremental flag, so the next run will compute the new rows and write the data with a new column and this column will be null in all the rows written previously.

However, we might want to populate the column for previous rows as well. Increasing the semantic_version of the transform will make it run non-incrementally once, and if you are using read mode of “added”, the input will contain all the data enabling you to recompute it and add the new column.

If your transform has been creating a historical dataset from snapshot input, then it becomes slightly more complicated, as the previous data is in a stack of snapshot transactions on your input. In this case, contact your Palantir representative.

In this example, we discussed adding a new column, but the above reasoning applies to all sorts of logic changes.

Developing incremental code on a branch

Creating a new branch and running a build on it, will run the build incrementally. Simply the last transaction commited on the original branch you created your branch based off will be seen as the previous transaction for the first build on the new branch.

Summary of examples

We saw how to process data incrementally by:

  • getting newly added rows, processing them and appending them to the output,
  • getting newly added rows, filtering them based on rows already present in the output and appending them to the output
  • getting newly added rows, computing an aggregation based on new rows and rows already present in the output and replacing the output with new aggregated statistics
  • leveraging incremental transforms to join large datasets

We also explored how to:

  • handle schema or logic changes of an incremental transform
  • develop incremental code on a branch without rebuilding based on full content of inputs

Incremental Python errors

To understand incremental errors, it is easier — and sometimes necessary — to have read the concepts of transactions and dataset views.

Catalog transaction errors

Useful context

When a job runs incrementally, its incremental input datasets only consist of the unprocessed transactions range, not the full dataset view.

Imagine the following transaction history for a dataset:

SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
                                   |
                       Last processed transaction

The last time the dataset was built, the latest transaction was (3). Since then, transactions (4) and (5) have been committed, therefore the unprocessed transaction range is (4) — (5).

The dataset view is the transaction range (1) — (5). The transaction “on top” of the view (the most recent) is sometimes referred to as the branch’s HEAD (again by analogy with git). Like in git a branch is a pointer to a transaction, so we say that the branch points at transaction (5). Several branches can point at several transactions, and branches might share a transaction history:

SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5)     [develop]
                                 |
                                 └─> UPDATE                              [feature-branch]

Error: Catalog:TransactionsNotInView

In order for the job to run incrementally, a series of checks is run at the beginning of the job. One of these checks verifies that the unprocessed transactions range is strictly incremental (i.e., append-only file changes, see requirements for incremental computation). It does so by comparing the files in the unprocessed transactions range, and the processed transactions range.

However if the branch’s HEAD has been moved, the incremental job is now in an inconsistent state: it no longer make sense to compare both ranges, so an error Catalog:TransactionNotInView is thrown.

See the below for a diagram of how this error can occur:

SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3)      ─> UPDATE (4) ─> UPDATE (5)
                   |          (last processed                  (branch's previous
                   |            transaction)                     HEAD, now orphan)
                   |
                   └─> UPDATE (6) --> UPDATE (7, branch's current HEAD)

Here the processed transaction range is (1) — (5), the current branch's HEAD points at (7), and the current view consists of transactions (1), (2), (6), and (7).

This is an inconsistent state because not all processed transactions are upstream of the branch’s HEAD: indeed (3) is not. In other words, the previous HEAD (3) no longer is part of the current view, hence why Catalog:TransactionNotInView is thrown.

Error: Catalog:InconsistentBranchesOrder

The other Catalog error that can be thrown is Catalog:InconsistentBranchesOrder, when the last processed transaction (prevTransaction) is greater than the branch HEAD (endTransaction). This can happen if the HEAD of the dataset is moved to a transaction before the previous transaction.

See the below for a diagram of how this error can occur:

SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
                                   |                             |
                               Current HEAD            Last processed transaction

Remediation of errors

A branch’s HEAD can change for two reasons:

  • A user knowingly updated the branch’s HEAD using Catalog endpoints.
  • Some transactions were not committed through a transform job. For example, when you merge branches in Code Workbook, the dataset is also “merged”.
  • However, transactions on Code Workbook datasets are always SNAPSHOT, so they cannot lead to an inconsistent state.

In order to remediate this, you will need to either:

  • Run the transform as a snapshot; for example, by bumping the semantic version. This starts a new dataset view, thereby resetting the incremental check mentioned above.
  • Manually update the branch’s HEAD to point at a transaction which is downstream of the already-processed range. This must be done using the updateBranch2 endpoint with the latest processed transaction as the parentRef. Note that we only recommend the use of this endpoint for experienced users.