This section contains a wide range of examples of incrementally computable transforms:
The examples make use of two inputs to demonstrate incremental computation: students
and students_updated
. The students
input contains 3 students and is not incremental. This means it has no history:
>>> students.dataframe('previous').sort('id').show()
+---+----+---+---+
| id|hair|eye|sex|
+---+----+---+---+
+---+----+---+---+
>>>
>>> students.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students.dataframe('added') is students.dataframe()
True
The students_updated
input is the same as students
but with an additional update that contains three extra students. This update makes the input incremental. Therefore, it has a non-empty previous
DataFrame.
>>> students_updated.dataframe('previous').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students_updated.dataframe('added') is students_updated.dataframe()
True
An append-only incremental computation is one where the added output rows are a function only of the added input rows. This means that to compute its output, the transform does the following:
Changing column types, formatting dates as strings, and filtering are all examples of append-only computations. In these examples, each added input row is transformed or deleted to generate the output rows.
Notice that the only difference to make an append-only transform incremental is the incremental()
decoration.
When running incrementally, the default read mode of added
means the transform reads only the new students, and the default write mode of modify
means the transform appends only the filtered new students to the output.
When running non-incrementally, the default read mode of added
means the transform reads the full input, and the default write mode of replace
means the transform replaces the output with the full set of filtered students.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): new_students_df = students.dataframe() processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )
Sometimes a transform needs to refer to its previous output in order to incrementally compute an update. An example of this is the distinct()
↗ method.
To remove duplicate rows in a transform (assuming the current output is correct), the transform must de-duplicate any new rows in the input, and then check those rows do not already exist in the output.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_distinct(students, processed): new_students_df = students.dataframe() processed.write_dataframe( new_students_df.distinct().subtract( processed.dataframe('previous', schema=new_students_df.schema) ) )
Here we make use of the previous
read mode on the output dataset. This returns the DataFrame
↗ that was output during the last build. Since it is possible that there is no previous
output, we have to provide a schema to the dataframe('previous')
call so that an empty DataFrame can be correctly constructed.
There are some transformations that always replace their entire output. Yet often, these transforms can still benefit from incremental computation. One such example is aggregating statistics. For example, counting the number of times each distinct value occurs in a column.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
from pyspark.sql import functions as F @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_group_by(students, processed): # Compute the hair color counts for only the new students. new_hair_counts_df = students.dataframe().groupBy('hair').count() # Union with the old counts out_schema = new_hair_counts_df.schema all_counts_df = new_hair_counts_df.union( processed.dataframe('previous', schema=out_schema) ) # Group by hair color, summing the two sets of counts. totals_df = all_counts_df.groupBy('hair').agg(F.sum('count').alias('count')) # To fully replace the output, we always set the output mode to 'replace'. processed.set_mode('replace') processed.write_dataframe(totals_df.select(out_schema.fieldNames()))
Again, since it is possible that there is no previous
output, we have to provide a schema to the dataframe('previous')
call so that an empty DataFrame can be correctly constructed.
Sometimes, an incremental transform needs to create a Spark DataFrame from the files added to a schemaless input dataset and then append the contents of that DataFrame to the output. For such transforms, there are two implementation patterns.
Statically specify the expected schema and enforce that the DataFrame generated has that schema (by ignoring extra fields in the input data, filling in nulls for fields missing in the input data, and so on).
Dynamically capture whatever fields are present in the input data.
For the dynamic capture implementation pattern, it is necessary to ensure that the DataFrame that is appended to the output has a schema that is compatible with the existing output. For a schema to be compatible with the existing output, the following conditions must be met:
Columns in the new data that have the same name as columns in the existing output must also have the same type.
All columns in the existing output must be present in the new data.
There are no columns in the new data with names that only differ in capitalization (case) from columns in the existing data; for example, there cannot be a column Value
in the new data if there is already a column value
in the existing data.
To ensure that these conditions are met, it is necessary to dynamically inspect the schema of the existing data, which means calling dataframe('previous')
without specifying a schema. This is supported as long as the transform is being run incrementally.
The below code puts all of these principles together.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
from functools import reduce from pyspark.sql import functions as F from transforms.api import transform, incremental, Input, Output from transforms.verbs.dataframes import sanitize_schema_for_parquet @incremental() @transform( csvs=Input('/examples/dataset_of_csvs'), parsed=Output('/examples/parsed_csvs') ) def incremental_read_csv(ctx, csvs, parsed): input_fs = csvs.filesystem() def process_file(f): df = ( ctx.spark_session.read # Set inferSchema to False so that every column is a string # This prevents issues due to inconsistent inference results between files, # both within and across incremental builds. .option("inferSchema", False) .option("header", True) .csv(input_fs.hadoop_path + "/" + f) ) sanitized = sanitize_schema_for_parquet(df) # Make all columns lowercase to prevent issues due to inconsistent casing # between files, both within and across incremental builds. # Note that this logic does not handle the situation # where a single file contains columns differing only in case. return sanitized.select(*(F.col(a).alias(a.lower()) for a in sanitized.columns)) dfs = [process_file(f.path) for f in input_fs.ls()] if len(dfs) == 0: parsed.abort() return df = reduce( lambda a, b: a.unionByName(b, allowMissingColumns=True), dfs[1:], dfs[0], ) if not ctx.is_incremental: parsed.write_dataframe(df) else: existing_columns = parsed.dataframe("previous").columns columns_in_new_data = set(df.columns) resolved_schema_df = df.select( *df.columns, *( F.lit(None).cast("string").alias(col) for col in existing_columns if col not in columns_in_new_data ) ) parsed.write_dataframe(resolved_schema_df)
Let's assume you have two tables - Orders
submitted by the customer and Deliveries
that have been completed - and we would like to compute a table DeliveryDuration
with the time it takes for items to be delivered. Even though both Orders
and Deliveries
tables will only get new rows appended and no rows will ever be modified, a simple join between the two incremental datasets will not work. For example, the Orders
table might contain orderIds
that are not yet present in the Deliveries
table.
Orders: Deliveries:
+---------+---------------+ +---------+--------------+ +---------+------------------+
| orderId | submittedDate | | orderId | deliveryDate | | orderId | deliveryDuration |
+---------+---------------+ +---------+--------------+ ----> +---------+------------------+
| 1001 | 2019-08-21 | join on | 1001 | 2019-08-23 | | 1001 | 2 |
+---------+---------------+ orderId +---------+--------------+ +---------+------------------+
| 1002 | 2019-08-22 |
+---------+---------------+
| 1003 | 2019-08-23 |
+---------+---------------+
Assuming orderId
is stricly increasing in both Orders
and Deliveries
tables, we can check what has been the last orderId
that we computed deliveryDuration
for (maxComputedOrderId
) and only get the rows from Orders
and Deliveries
tables with orderId
bigger than maxComputedOrderId
:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F @incremental(snapshot_inputs=['orders', 'deliveries']) @transform( orders=Input('/example/Orders'), deliveries=Input('/example/Deliveries'), delivery_duration=Output('/example/New_Delivery_Date') ) def compute_delivery_duration(orders, deliveries, delivery_duration): def to_fields(datatype, names, nullable=True): return [T.StructField(n, datatype, nullable) for n in names] # Generate a schema to pass for deliveryDuration fields = to_fields(T.IntegerType(), ['orderId', 'deliveryDuration']) # Explicitly define the schema as you can't refer to the previous version schema maxComputedOrderId = ( delivery_duration .dataframe('previous', schema=T.StructType(fields)) .groupby() .max('orderId') .collect()[0][0] ) # At first iteration, maxComputedOrderId is empty because delivery_duration dataset doesn't exist yet if maxComputedOrderId == None: maxComputedOrderId = 0 ordersNotProcessed = orders.dataframe().filter(F.col('orderId') > maxComputedOrderId) deliveriesNotProcessed = deliveries.dataframe().filter(F.col('orderId') > maxComputedOrderId) newDurations = ( ordersNotProcessed .join(deliveriesNotProcessed, 'orderId', how='left') .withColumn('deliveryDuration', F.datediff(F.col('deliveryDate'), F.col('submittedDate'))) .drop(*['submittedDate', 'deliveryDate']) ) delivery_duration.write_dataframe(newDurations)
Let’s say we would like to add another column to our incremental dataset from now on. Adding another column to the output won’t invalidate the is_incremental
flag, so the next run will compute the new rows and write the data with a new column and this column will be null in all the rows written previously.
However, we might want to populate the column for previous rows as well. Increasing the semantic_version
of the transform will make it run non-incrementally once, and if you are using read mode of “added”, the input will contain all the data enabling you to recompute it and add the new column.
If your transform has been creating a historical dataset from snapshot input, then it becomes slightly more complicated, as the previous data is in a stack of snapshot transactions on your input. In this case, contact your Palantir representative.
In this example, we discussed adding a new column, but the above reasoning applies to all sorts of logic changes.
Creating a new branch and running a build on it, will run the build incrementally. Simply the last transaction commited on the original branch you created your branch based off will be seen as the previous transaction for the first build on the new branch.
We saw how to process data incrementally by:
We also explored how to:
To understand incremental errors, it is easier — and sometimes necessary — to have read the concepts of transactions and dataset views.
When a job runs incrementally, its incremental input datasets only consist of the unprocessed transactions range, not the full dataset view.
Imagine the following transaction history for a dataset:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
|
Last processed transaction
The last time the dataset was built, the latest transaction was (3). Since then, transactions (4) and (5) have been committed, therefore the unprocessed transaction range is (4) — (5).
The dataset view is the transaction range (1) — (5). The transaction “on top” of the view (the most recent) is sometimes referred to as the branch’s HEAD (again by analogy with git). Like in git a branch is a pointer to a transaction, so we say that the branch points at transaction (5). Several branches can point at several transactions, and branches might share a transaction history:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5) [develop]
|
└─> UPDATE [feature-branch]
Catalog:TransactionsNotInView
In order for the job to run incrementally, a series of checks is run at the beginning of the job. One of these checks verifies that the unprocessed transactions range is strictly incremental (i.e., append-only file changes, see requirements for incremental computation). It does so by comparing the files in the unprocessed transactions range, and the processed transactions range.
However if the branch’s HEAD has been moved, the incremental job is now in an inconsistent state: it no longer make sense to compare both ranges, so an error Catalog:TransactionNotInView
is thrown.
See the below for a diagram of how this error can occur:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5)
| (last processed (branch's previous
| transaction) HEAD, now orphan)
|
└─> UPDATE (6) --> UPDATE (7, branch's current HEAD)
Here the processed transaction range is (1) — (5), the current branch's HEAD points at (7), and the current view consists of transactions (1), (2), (6), and (7).
This is an inconsistent state because not all processed transactions are upstream of the branch’s HEAD: indeed (3) is not. In other words, the previous HEAD (3) no longer is part of the current view, hence why Catalog:TransactionNotInView
is thrown.
Catalog:InconsistentBranchesOrder
The other Catalog error that can be thrown is Catalog:InconsistentBranchesOrder
, when the last processed transaction (prevTransaction
) is greater than the branch HEAD (endTransaction
). This can happen if the HEAD of the dataset is moved to a transaction before the previous transaction.
See the below for a diagram of how this error can occur:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
| |
Current HEAD Last processed transaction
A branch’s HEAD can change for two reasons:
SNAPSHOT
, so they cannot lead to an inconsistent state.In order to remediate this, you will need to either:
updateBranch2
endpoint with the latest processed transaction as the parentRef. Note that we only recommend the use of this endpoint for experienced users.