How do I fetch data from an API and update it incrementally using external transforms?
This code uses PySpark and the requests library to fetch data from an API between a specified date range and update the output incrementally. It additionally supports paging if the API also supports paging.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
from pyspark.sql import functions as F from transforms.api import incremental, transform, Output import requests from transforms.external.systems import EgressPolicy, use_external_systems, Credential import logging from datetime import datetime as dt import json def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'): headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} data = { "from": start_date, "to": end_date, } response = requests.post(next_link_url, json=data, headers=headers) logging.warn(response.json()) data = response.json()["data"] return json.dumps(data) @use_external_systems( creds=Credential(), egress=EgressPolicy(), ) @incremental() @transform( output=Output(), ) def compute(output, creds, egress, ctx): token = creds.get("token") if ctx.is_incremental: previous = output.dataframe('current').localCheckpoint() if NEXT_LINK_COLUMN in previous.columns: latest_row = ( previous .where(F.col(LAST_MODIFIED_COLUMN).isNotNull()) .orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()]) .limit(1).collect()[0] ) next_link_url = latest_row[NEXT_LINK_COLUMN] last_date = latest_row[LAST_MODIFIED_COLUMN] else: last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN] today = dt.today().strftime("%Y-%m-%d") data = _get_data(token, last_date, today, next_link_url) df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}]) output.set_mode("modify") output.write_dataframe(df)
API
, pyspark
, incremental
, dataframe
, external transform
How can I change inputs of an incremental transform depending on whether it is running incrementally or not?
Copied!1 2 3 4 5 6 7 8 9 10 11
@incremental() @transform( x=Output(), y=Input(), z=Input() ) def compute(ctx, x, y, z): if ctx.is_incremental: ## Some Code else: ## Other Code
code authoring
, code repositories
, python
, incremental
How do I optimize non-incremental pipelines with incremental decorators to do incremental aggregation on a very large dataset?
This code uses PySpark to compute daily aggregates of a dataset by grouping the dataset based on a specific field and counting the distinct values of another field. The code then stores the result in an output dataframe, handling both incremental and non-incremental cases.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
from pyspark.sql import functions as F from transforms.api import transform, incremental, Input, Output from pyspark.sql import DataFrame @incremental(semantic_version=1) @transform( # input dataset input_data=Input(""), daily_aggregate=Output("") ) def compute(ctx, input_data, daily_aggregate): input_df = input_data.dataframe() latest_daily_agg = input_df.groupBy(F.col("group_by_field")).agg(F.count_distinct(F.col("unique_thing")).alias("sum_of_unique")) # you need a schema to load a previous output. latest_daily_agg_schema = latest_daily_agg.schema if ctx.is_incremental: last_daily_agg = daily_aggregate.dataframe(mode='previous', schema=latest_daily_agg_schema) sum_daily = last_daily_agg.unionByName(latest_daily_agg).groupBy(F.col("group_by_field")).agg(F.sum(F.col("sum_of_unique")).alias("sum_of_unique")) daily_aggregate.set_mode('replace') daily_aggregate.write_dataframe(sum_daily) else: # not incremental - just store the latest daily daily_aggregate.write_dataframe(latest_daily_agg)
code authoring
, code repositories
, python
, incremental
, aggregation
How to implement an incremental transform in Palantir Foundry using PySpark?
This code demonstrates how to use the incremental decorator in a PySpark transform to handle incremental processing. It adds a processed_at
timestamp column and an is_running_as_incremental
column to the input dataframe.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
from transforms.api import transform_df, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F # Apply the incremental decorator @incremental() @transform_df( Output(""), input_df=Input("") ) def example_transform_incremental_processing(ctx, input_df): # The behavior of the transform depends on the state of the inputs/outputs # (See cases 1 to 3 in the original code snippet for details) # # Nothing changes in the logic below compared to a snapshot processing. # The behavior is taken care of by Foundry. # Example processing here input_df = input_df.withColumn('processed_at', F.current_timestamp()) # Use ctx.is_incremental to know if Foundry is running the current build as "a snapshot" or as "an incremental" input_df = input_df.withColumn('is_running_as_incremental', F.lit(ctx.is_incremental)) # Returns the edited dataframe return input_df
code authoring
, code repositories
, python
, incremental
How can I do an incremental transform on a snapshot input that is fully rewritten and only process new rows I haven't process before?
This code uses PySpark to read an input dataset as a snapshot and compares it with the previous output to find new rows. It then adds a timestamp to the new rows and appends them to the output dataset, performing an incremental transformation.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
from transforms.api import incremental, transform, Input, Output from pyspark.sql import types as T, functions as F # @incremental decorator to get advanced read/write modes. @incremental( snapshot_inputs=["input_dataset"] ) # @transform to have more control over inputs and outputs. @transform( output_dataset=Output("incremental_output"), input_dataset=Input("snapshot_input") ) def example_transform_very_advanced_processing__snapshot_to_incremental(ctx, input_dataset, output_dataset): # We enforce the read of the input dataframe as a snapshot input_df_all_dataframe = input_dataset.dataframe(mode="current") # We read the current output to see what we already processed in previous builds out_schema = T.StructType([T.StructField("primary_key", T.StringType(), True), T.StructField("other_column", T.IntegerType(), True), # Include other columns you store in your output T.StructField("processed_at", T.TimestampType(), True)]) output_df_previous_dataframe = output_dataset.dataframe('previous', out_schema) # ==== Example processing here ==== # We diff the input with the current output, to find the "new rows". KEY = ["primary_key"] new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY) # We add a timestamp for easier tracking/debugging/understanding of the example new_rows_df = new_rows_df.withColumn('processed_at', F.current_timestamp()) # ==== End of example processing ==== # This will append rows output_dataset.set_mode("modify") output_dataset.write_dataframe(new_rows_df)
code authoring
, code repositories
, python
, incremental