Code examplesIncremental transformsTransforms

Transforms

Python

Fetch and update data incrementally from API using PySpark

How do I fetch data from an API and update it incrementally using external transforms?

This code uses PySpark and the requests library to fetch data from an API between a specified date range and update the output incrementally. It additionally supports paging if the API also supports paging.

Copied!
1from pyspark.sql import functions as F 2from transforms.api import incremental, transform, Output 3import requests 4from transforms.external.systems import EgressPolicy, use_external_systems, Credential 5import logging 6from datetime import datetime as dt 7import json 8 9 10def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'): 11 headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} 12 13 data = { 14 "from": start_date, 15 "to": end_date, 16 } 17 response = requests.post(next_link_url, json=data, headers=headers) 18 logging.warn(response.json()) 19 data = response.json()["data"] 20 return json.dumps(data) 21 22 23@use_external_systems( 24 creds=Credential(), 25 egress=EgressPolicy(), 26) 27@incremental() 28@transform( 29 output=Output(), 30) 31def compute(output, creds, egress, ctx): 32 token = creds.get("token") 33 if ctx.is_incremental: 34 previous = output.dataframe('current').localCheckpoint() 35 if NEXT_LINK_COLUMN in previous.columns: 36 latest_row = ( 37 previous 38 .where(F.col(LAST_MODIFIED_COLUMN).isNotNull()) 39 .orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()]) 40 .limit(1).collect()[0] 41 ) 42 next_link_url = latest_row[NEXT_LINK_COLUMN] 43 last_date = latest_row[LAST_MODIFIED_COLUMN] 44 else: 45 last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN] 46 47 today = dt.today().strftime("%Y-%m-%d") 48 49 data = _get_data(token, last_date, today, next_link_url) 50 51 df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}]) 52 output.set_mode("modify") 53 output.write_dataframe(df)
  • Date submitted: 2024-04-26
  • Tags: API, pyspark, incremental, dataframe, external transform

Incremental helper

How can I change inputs of an incremental transform depending on whether it is running incrementally or not?

Copied!
1@incremental() 2@transform( 3 x=Output(), 4 y=Input(), 5 z=Input() 6) 7def compute(ctx, x, y, z): 8 if ctx.is_incremental: 9 ## Some Code 10 else: 11 ## Other Code
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental

Incremental sum aggregation

How do I optimize non-incremental pipelines with incremental decorators to do incremental aggregation on a very large dataset?

This code uses PySpark to compute daily aggregates of a dataset by grouping the dataset based on a specific field and counting the distinct values of another field. The code then stores the result in an output dataframe, handling both incremental and non-incremental cases.

Copied!
1from pyspark.sql import functions as F 2from transforms.api import transform, incremental, Input, Output 3from pyspark.sql import DataFrame 4 5 6@incremental(semantic_version=1) 7@transform( 8 # input dataset 9 input_data=Input(""), 10 daily_aggregate=Output("") 11) 12def compute(ctx, input_data, daily_aggregate): 13 input_df = input_data.dataframe() 14 latest_daily_agg = input_df.groupBy(F.col("group_by_field")).agg(F.count_distinct(F.col("unique_thing")).alias("sum_of_unique")) 15 # you need a schema to load a previous output. 16 latest_daily_agg_schema = latest_daily_agg.schema 17 18 if ctx.is_incremental: 19 20 last_daily_agg = daily_aggregate.dataframe(mode='previous', schema=latest_daily_agg_schema) 21 sum_daily = last_daily_agg.unionByName(latest_daily_agg).groupBy(F.col("group_by_field")).agg(F.sum(F.col("sum_of_unique")).alias("sum_of_unique")) 22 daily_aggregate.set_mode('replace') 23 daily_aggregate.write_dataframe(sum_daily) 24 25 else: 26 # not incremental - just store the latest daily 27 daily_aggregate.write_dataframe(latest_daily_agg)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental, aggregation

Incremental transform in PySpark

How to implement an incremental transform in Palantir Foundry using PySpark?

This code demonstrates how to use the incremental decorator in a PySpark transform to handle incremental processing. It adds a processed_at timestamp column and an is_running_as_incremental column to the input dataframe.

Copied!
1from transforms.api import transform_df, Input, Output, incremental 2from pyspark.sql import types as T 3from pyspark.sql import functions as F 4 5# Apply the incremental decorator 6@incremental() 7@transform_df( 8 Output(""), 9 input_df=Input("") 10) 11def example_transform_incremental_processing(ctx, input_df): 12 # The behavior of the transform depends on the state of the inputs/outputs 13 # (See cases 1 to 3 in the original code snippet for details) 14 # 15 # Nothing changes in the logic below compared to a snapshot processing. 16 # The behavior is taken care of by Foundry. 17 18 # Example processing here 19 input_df = input_df.withColumn('processed_at', F.current_timestamp()) 20 21 # Use ctx.is_incremental to know if Foundry is running the current build as "a snapshot" or as "an incremental" 22 input_df = input_df.withColumn('is_running_as_incremental', F.lit(ctx.is_incremental)) 23 24 # Returns the edited dataframe 25 return input_df
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental

Snapshot input to incremental output

How can I do an incremental transform on a snapshot input that is fully rewritten and only process new rows I haven't process before?

This code uses PySpark to read an input dataset as a snapshot and compares it with the previous output to find new rows. It then adds a timestamp to the new rows and appends them to the output dataset, performing an incremental transformation.

Copied!
1from transforms.api import incremental, transform, Input, Output 2from pyspark.sql import types as T, functions as F 3 4# @incremental decorator to get advanced read/write modes. 5@incremental( 6 snapshot_inputs=["input_dataset"] 7) 8# @transform to have more control over inputs and outputs. 9@transform( 10 output_dataset=Output("incremental_output"), 11 input_dataset=Input("snapshot_input") 12) 13def example_transform_very_advanced_processing__snapshot_to_incremental(ctx, input_dataset, output_dataset): 14 # We enforce the read of the input dataframe as a snapshot 15 input_df_all_dataframe = input_dataset.dataframe(mode="current") 16 17 # We read the current output to see what we already processed in previous builds 18 out_schema = T.StructType([T.StructField("primary_key", T.StringType(), True), 19 T.StructField("other_column", T.IntegerType(), True), # Include other columns you store in your output 20 T.StructField("processed_at", T.TimestampType(), True)]) 21 output_df_previous_dataframe = output_dataset.dataframe('previous', out_schema) 22 23 # ==== Example processing here ==== 24 # We diff the input with the current output, to find the "new rows". 25 KEY = ["primary_key"] 26 new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY) 27 # We add a timestamp for easier tracking/debugging/understanding of the example 28 new_rows_df = new_rows_df.withColumn('processed_at', F.current_timestamp()) 29 30 # ==== End of example processing ==== 31 32 # This will append rows 33 output_dataset.set_mode("modify") 34 output_dataset.write_dataframe(new_rows_df)
  • Date submitted: 2024-07-18
  • Tags: code authoring, code repositories, python, incremental