Code examplesIncremental transformsTransforms

Transforms

Python

Fetch and update data incrementally from API using PySpark

How do I fetch data from an API and update it incrementally using external transforms?

This code uses PySpark and the requests library to fetch data from an API between a specified date range and update the output incrementally. It additionally supports paging if the API also supports paging.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 from pyspark.sql import functions as F from transforms.api import incremental, transform, Output import requests from transforms.external.systems import EgressPolicy, use_external_systems, Credential import logging from datetime import datetime as dt import json def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'): headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} data = { "from": start_date, "to": end_date, } response = requests.post(next_link_url, json=data, headers=headers) logging.warn(response.json()) data = response.json()["data"] return json.dumps(data) @use_external_systems( creds=Credential(), egress=EgressPolicy(), ) @incremental() @transform( output=Output(), ) def compute(output, creds, egress, ctx): token = creds.get("token") if ctx.is_incremental: previous = output.dataframe('current').localCheckpoint() if NEXT_LINK_COLUMN in previous.columns: latest_row = ( previous .where(F.col(LAST_MODIFIED_COLUMN).isNotNull()) .orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()]) .limit(1).collect()[0] ) next_link_url = latest_row[NEXT_LINK_COLUMN] last_date = latest_row[LAST_MODIFIED_COLUMN] else: last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN] today = dt.today().strftime("%Y-%m-%d") data = _get_data(token, last_date, today, next_link_url) df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}]) output.set_mode("modify") output.write_dataframe(df)
  • Date submitted: 2024-04-26
  • Tags: API, pyspark, incremental, dataframe, external transform

Incremental helper

How can I change inputs of an incremental transform depending on whether it is running incrementally or not?

Copied!
1 2 3 4 5 6 7 8 9 10 11 @incremental() @transform( x=Output(), y=Input(), z=Input() ) def compute(ctx, x, y, z): if ctx.is_incremental: ## Some Code else: ## Other Code
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental

Incremental sum aggregation

How do I optimize non-incremental pipelines with incremental decorators to do incremental aggregation on a very large dataset?

This code uses PySpark to compute daily aggregates of a dataset by grouping the dataset based on a specific field and counting the distinct values of another field. The code then stores the result in an output dataframe, handling both incremental and non-incremental cases.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from pyspark.sql import functions as F from transforms.api import transform, incremental, Input, Output from pyspark.sql import DataFrame @incremental(semantic_version=1) @transform( # input dataset input_data=Input(""), daily_aggregate=Output("") ) def compute(ctx, input_data, daily_aggregate): input_df = input_data.dataframe() latest_daily_agg = input_df.groupBy(F.col("group_by_field")).agg(F.count_distinct(F.col("unique_thing")).alias("sum_of_unique")) # you need a schema to load a previous output. latest_daily_agg_schema = latest_daily_agg.schema if ctx.is_incremental: last_daily_agg = daily_aggregate.dataframe(mode='previous', schema=latest_daily_agg_schema) sum_daily = last_daily_agg.unionByName(latest_daily_agg).groupBy(F.col("group_by_field")).agg(F.sum(F.col("sum_of_unique")).alias("sum_of_unique")) daily_aggregate.set_mode('replace') daily_aggregate.write_dataframe(sum_daily) else: # not incremental - just store the latest daily daily_aggregate.write_dataframe(latest_daily_agg)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental, aggregation

Incremental transform in PySpark

How to implement an incremental transform in Palantir Foundry using PySpark?

This code demonstrates how to use the incremental decorator in a PySpark transform to handle incremental processing. It adds a processed_at timestamp column and an is_running_as_incremental column to the input dataframe.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from transforms.api import transform_df, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F # Apply the incremental decorator @incremental() @transform_df( Output(""), input_df=Input("") ) def example_transform_incremental_processing(ctx, input_df): # The behavior of the transform depends on the state of the inputs/outputs # (See cases 1 to 3 in the original code snippet for details) # # Nothing changes in the logic below compared to a snapshot processing. # The behavior is taken care of by Foundry. # Example processing here input_df = input_df.withColumn('processed_at', F.current_timestamp()) # Use ctx.is_incremental to know if Foundry is running the current build as "a snapshot" or as "an incremental" input_df = input_df.withColumn('is_running_as_incremental', F.lit(ctx.is_incremental)) # Returns the edited dataframe return input_df
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental

Snapshot input to incremental output

How can I do an incremental transform on a snapshot input that is fully rewritten and only process new rows I haven't process before?

This code uses PySpark to read an input dataset as a snapshot and compares it with the previous output to find new rows. It then adds a timestamp to the new rows and appends them to the output dataset, performing an incremental transformation.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from transforms.api import incremental, transform, Input, Output from pyspark.sql import types as T, functions as F # @incremental decorator to get advanced read/write modes. @incremental( snapshot_inputs=["input_dataset"] ) # @transform to have more control over inputs and outputs. @transform( output_dataset=Output("incremental_output"), input_dataset=Input("snapshot_input") ) def example_transform_very_advanced_processing__snapshot_to_incremental(ctx, input_dataset, output_dataset): # We enforce the read of the input dataframe as a snapshot input_df_all_dataframe = input_dataset.dataframe(mode="current") # We read the current output to see what we already processed in previous builds out_schema = T.StructType([T.StructField("primary_key", T.StringType(), True), T.StructField("other_column", T.IntegerType(), True), # Include other columns you store in your output T.StructField("processed_at", T.TimestampType(), True)]) output_df_previous_dataframe = output_dataset.dataframe('previous', out_schema) # ==== Example processing here ==== # We diff the input with the current output, to find the "new rows". KEY = ["primary_key"] new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY) # We add a timestamp for easier tracking/debugging/understanding of the example new_rows_df = new_rows_df.withColumn('processed_at', F.current_timestamp()) # ==== End of example processing ==== # This will append rows output_dataset.set_mode("modify") output_dataset.write_dataframe(new_rows_df)
  • Date submitted: 2024-07-18
  • Tags: code authoring, code repositories, python, incremental