How can I prevent or abort a transform from running if the input or source dataset is empty?
This code uses a PySpark transform to check if the input DataFrame is empty before writing it to the output location. If the DataFrame is empty, the transaction is aborted, preventing the writing of empty DataFrames.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from pyspark.sql import functions as F from transforms.api import transform, Input, Output @transform( out=Output("/Palantir/output_location/datasets/not_process_empty_files"), source_df=Input("/Palantir/input_location/sometimes_empty"), ) def compute(source_df, out): source_df = source_df.dataframe() # Checking if there is at least one row to write if len(source_df.head(1)) == 0: # No row to write, abort the transaction out.abort() else: # Write, hence append. out.write_dataframe(source_df)
code authoring
, code repositories
, python
, abort
How do I create a dataframe inside a PySpark transform and write it to the output.
This code defines a function to create an empty DataFrame with a single row containing a dummy key and a timestamp column. It then writes the DataFrame to an output dataset path using a transform decorator.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
from pyspark.sql import types as T from pyspark.sql import functions as F # Generation of empty dataframe, that just appends the current timestamp def get_empty_df(ctx): # Define the schema for the DataFrame schema = T.StructType([T.StructField("key", T.StringType(), True)]) # Create a DataFrame with a single row containing a dummy key df = ctx.spark_session.createDataFrame([("dummy_key",)], schema) # Add a new column 'when' with the current timestamp df = df.withColumn('when', F.current_timestamp()) return df @transform( out=Output("output_dataset_path") ) def out_1(ctx, out): # Create the empty dataframe df = get_empty_df(ctx) # Write the dataframe to the output out.write_dataframe(df)
code authoring
, code repositories
, python
How do I perform fuzzy matching of entity names using phonetic codes in PySpark?
This code uses PySpark to clean entity names, generate phonetic codes, and perform fuzzy matching of entity names using the Jaro similarity metric. It is useful for matching similar entity names in two datasets.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
from pyspark.sql import functions as F from pyspark.sql import types as T from transforms.api import transform_df, Input, Output import re import jellyfish def _add_phonetic_codes(df): # Generate phonetic codes for each part of the name df = df.withColumn( "name_part", F.split("cleaned_name", " ") ).withColumn( "name_part", F.explode("name_part") ).withColumn( "phonetic_code", F.soundex("name_part") ).drop("name_part") return df @transform_df( Output(), entities2=Input(), entities1=Input(), ) def compute(sanctions, entities): # Set up UDF for cleaning text def clean_text(text): cleaned_text = re.sub(r" +", " ", re.sub(r"[./-]+", "", text)).lower() return cleaned_text clean_text_udf = F.udf(clean_text, T.StringType()) # Clean entity name entities2 = entities2.withColumn("cleaned_name", clean_text_udf(F.col("name"))) entities1 = entities1.withColumn("cleaned_name", clean_text_udf(F.col("entity_name"))) # Add phonetic codes entities2 = _add_phonetic_codes(entities2) entities1 = _add_phonetic_codes(entities1) # Fuzzy join matched_entities = entities1.join( entities2, on=["phonetic_code"], how="inner" ).select( entities1.cleaned_name.alias("cleaned_name1"), entities1.id.alias("entity_id1") entities2.cleaned_name.alias("cleaned_name2"), entities2.id.alias("entity_id2") ).drop("phonetic_code") matched_entities = matched_entities.dropDuplicates() # Set up UDF for string comparison @F.udf() def jaro_compare(name1, name2): return jellyfish.jaro_similarity(name1, name2) # Fuzzy matching matched_entities = matched_entities.withColumn( "match_score", jaro_compare("cleaned_name1", "cleaned_name2") ) matched_entities = matched_entities.filter(entities.match_score > 0.75) matched_entities = matched_entities.select("entity_id1", "entity_id2") return matched_entities
pyspark
, fuzzy matching
, phonetic codes
, jaro similarity
How do I load an ORC file using PySpark?
This code reads a raw ORC file from a the Hadoop path of an input dataset and writes the resulting spark dataframe to an output.
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, Input, Output @transform( out=Output("output"), raw=Input("input"), ) def compute(ctx, out, raw): hadoop_path = raw.filesystem().hadoop_path df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/') out.write_dataframe(df)
pyspark
, dataframe
, orc
, hadoop
How do I create a multi-input, multi-output transform in Foundry?
This code demonstrates how to create a PySpark transform that takes multiple input datasets and produces multiple output datasets. It uses the @transform decorator and explicitly names the inputs and outputs. The transform reads the input dataframes, processes them, and writes the results to the specified output datasets.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F # @incremental decorator or not (compatible with both) # Changed @transform_df for @transform # This gives more control over inputs and outputs # It also requires to explicitly name outputs @transform( output_dataset_1=Output("3_multi_output_1"), output_dataset_2=Output("3_multi_output_2"), input_dataset_1=Input("fake_dataset"), input_dataset_2=Input("fake_dataset_2") ) def example_transform_multi_inputs_outputs(input_dataset_1, input_dataset_2, output_dataset_1, output_dataset_2): # Inputs can be read input_df_1 = input_dataset_1.dataframe() input_df_2 = input_dataset_2.dataframe() # Example processing here only_data_from_1 = input_df_1.withColumn('processed_at', F.current_timestamp()) unioned_version = input_df_1.unionByName(input_df_2).withColumn('processed_at', F.current_timestamp()) # Instead of returning the modified dataframe, you need to explicitly call "write_dataframe" # on the output of your choice with the dataframe (to write) of your choice. output_dataset_1.write_dataframe(only_data_from_1) output_dataset_2.write_dataframe(unioned_version)
code authoring
, code repositories
, python