How do I perform fuzzy matching of entity names using phonetic codes in PySpark?
This code uses PySpark to clean entity names, generate phonetic codes, and perform fuzzy matching of entity names using the Jaro similarity metric. It is useful for matching similar entity names in two datasets.
Copied!1from pyspark.sql import functions as F 2from pyspark.sql import types as T 3from transforms.api import transform_df, Input, Output 4import re 5import jellyfish 6 7 8def _add_phonetic_codes(df): 9 # Generate phonetic codes for each part of the name 10 df = df.withColumn( 11 "name_part", F.split("cleaned_name", " ") 12 ).withColumn( 13 "name_part", F.explode("name_part") 14 ).withColumn( 15 "phonetic_code", F.soundex("name_part") 16 ).drop("name_part") 17 return df 18 19 20@transform_df( 21 Output(), 22 entities2=Input(), 23 entities1=Input(), 24) 25def compute(sanctions, entities): 26 27 # Set up UDF for cleaning text 28 def clean_text(text): 29 cleaned_text = re.sub(r" +", " ", re.sub(r"[./-]+", "", text)).lower() 30 return cleaned_text 31 32 clean_text_udf = F.udf(clean_text, T.StringType()) 33 34 # Clean entity name 35 entities2 = entities2.withColumn("cleaned_name", clean_text_udf(F.col("name"))) 36 entities1 = entities1.withColumn("cleaned_name", clean_text_udf(F.col("entity_name"))) 37 38 # Add phonetic codes 39 entities2 = _add_phonetic_codes(entities2) 40 entities1 = _add_phonetic_codes(entities1) 41 42 # Fuzzy join 43 matched_entities = entities1.join( 44 entities2, on=["phonetic_code"], how="inner" 45 ).select( 46 entities1.cleaned_name.alias("cleaned_name1"), entities1.id.alias("entity_id1") 47 entities2.cleaned_name.alias("cleaned_name2"), entities2.id.alias("entity_id2") 48 ).drop("phonetic_code") 49 matched_entities = matched_entities.dropDuplicates() 50 51 # Set up UDF for string comparison 52 @F.udf() 53 def jaro_compare(name1, name2): 54 return jellyfish.jaro_similarity(name1, name2) 55 56 # Fuzzy matching 57 matched_entities = matched_entities.withColumn( 58 "match_score", jaro_compare("cleaned_name1", "cleaned_name2") 59 ) 60 matched_entities = matched_entities.filter(entities.match_score > 0.75) 61 matched_entities = matched_entities.select("entity_id1", "entity_id2") 62 return matched_entities
pyspark
, fuzzy matching
, phonetic codes
, jaro similarity