If at all possible, it is best practice for datasets of this type to be ingested as APPEND transactions from the start. See the Warnings below for additional details.
Occasionally you will have a raw dataset where each day/week/hour a new SNAPSHOT import replaces the previous view with the current data for the dataset. However, it's often useful to have the previous data available as well, to determine what has changed from the previous view. As mentioned above, the best practice in this case is to handle this as part of the ingestion by using APPEND transactions and adding a column with the import date. However, in cases where this is not possible, you can use the incremental()
decorator in Python transforms to append these regular SNAPSHOTs into a historical version of that dataset. See the Warnings below for the caveats around the fragility of this approach.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
@incremental(snapshot_inputs=['input_data']) @transform( input_data=Input("/path/to/snapshot/input"), history=Output("/path/to/historical/dataset"), ) def my_compute_function(input_data, history): input_df = input_data.dataframe() # note that you can also use current_timestamp() below # if the input will change > 1x/day input_df = input_df.withColumn('date', current_date()) history.write_dataframe(input_df)
The incremental decorator applies additional logic to the read/write modes on the inputs and output. In the example above, we use the default read/write modes for the input and output.
When using a SNAPSHOT
input, the default read mode is current
, which means it takes the entire input dataframe, and not just the rows added since the last transaction. If the input dataset was created from an APPEND
transaction, however, we could use the incremental() decorator to use the added
read mode to access only those rows added since the last build.
The transform obtains schema information from the current
output, so there is no need to pass schema information like you would if you were reading a previous
version of the dataframe (e.g., dataframe('previous', schema=input.schema)
).
When we say a transform is run incrementally, it means that the default write mode for the output is set to modify
. This mode modifies the existing output with data written during the build. For example, calling write_dataframe()
when the output is in modify
mode will append the written dataframe to the existing output. This is exactly what is happening in this case.
Because this transform uses SNAPSHOT
datasets as inputs, there is no way to recover a snapshot your build may have missed (due to build failures or other reason). If this is a concern, do not use this method. Instead, contact the owner of the input datasets to see if it would be possible to convert it to an APPEND
dataset so that you can access the dataset's previous transaction. That is the way incremental computation was designed to work.
This will fail if:
integer
to decimal
)Using this pattern can cause an accumulation of small files in the historical dataset. File accumulation is not a desired outcome and will lead to increased build times and resource consumption in downstream transforms or analyses that use this historical dataset. Batch and interactive compute time may increase as there is an overhead associated with reading in each file. Disk usage may increase because compression is done on a per-file basis, and not across files within a dataset. It is possible to build logic to periodically cause a re-snapshot of the data and prevent this behavior from happening.
By inspecting the number of output files, we can determine an optimal incremental write mode. This mode allows us to read in the previous transaction's output as dataframe, union it to the incoming data, and coalesce the data files together, turning many small files into one larger file.
Inspect the number of files in the output dataset's file system and use an if statement to set the write_mode
, as in the following example:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
from transforms.api import transform, Input, Output, configure, incremental from pyspark.sql import types as T FILE_COUNT_LIMIT = 100 # Be sure to insert your desired output schema here schema = T.StructType([ T.StructField('Value', T.DoubleType()), T.StructField('Time', T.TimestampType()), T.StructField('DataGroup', T.StringType()) ]) def compute_logic(df): """ This is your transforms logic """ return df.filter(True) @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @incremental(semantic_version=1) @transform( output=Output("/Org/Project/Folder1/Output_dataset_incremental"), input_df=Input("/Org/Project/Folder1/Input_dataset_incremental"), ) def compute(input_df, output): df = input_df.dataframe('added') df = compute_logic(df) # Fetches a list of the files that are in the dataset files = list(output.filesystem(mode='previous').ls()) if (len(files) > FILE_COUNT_LIMIT): # incremental merge and replace previous_df = output.dataframe('previous', schema) df = df.unionByName(previous_df) mode = 'replace' else: # Standard incremental mode mode = 'modify' output.set_mode(mode) output.write_dataframe(df.coalesce(1))