How do I flatten a hierarchical tree data structure into a flat table with parent-child relationships?
This code uses PySpark to transform a hierarchical tree data structure into a flat table with parent-child relationships. It creates a function to extract objects per level and generates unique primary keys for nodes and parents. The output dataframe contains columns for node_id
, node_description
, node_level
, parent_id
, parent_level
, parents_path
, node_pk
, and parent_pk
.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
from transforms.api import transform_df, Input, Output, transform from pyspark.sql import functions as F # CONSTANTS COL_ORDER = ["level1", "level2", "level3", "level4"] COLS_DESCRIPTION = { "node_id": "Identifier of a node, non-unique", "node_description": "Human-readable identifier of the node, non-unique", "node_level": "Hierarchy level of a node", "parent_id": "Identifier of a node's parent, non-unique", "parent_level": "Human-readable identifier of a node's parent, non-unique", "parents_path": "Array of parent_ids, from highest to closest parent.", "node_pk": "unique.", "parent_pk": "unique." } ''' Translate something of the format : level1 | level2 | level3 | level4 | some_value root | folder1 | folder2 | file_name | file_content into : node_id | node_level | parent_id | parent_level | value root | level1 | null | null | null folder1 | level2 | root | level1 | null folder2 | level3 | folder1 | level2 | null file_name | level4 | folder2 | level3 | file_content ''' def flatten_tree_data(tree_df, out): tree_df = tree_df.dataframe() # Function to extract object "per level". def create_object(df, node_id_col="level2", node_description_col="level2", node_level="level2", parent_ids_cols=["level1", "...", "level3"], parent_id_col="level3", parent_level="level3"): # Filter down the columns. # Note : parent_id_col shall be included in parent_ids_cols # Using a set to remove potential duplicates if id columns and description columns are the same columns_to_keep = list(set([node_id_col, node_description_col, *parent_ids_cols])) out_df = df.select(columns_to_keep) # DISTINCT to drop duplicates out_df = out_df.distinct() # Store values of the specific node out_df = out_df.withColumn("node_id", F.col(node_id_col)) out_df = out_df.withColumn("node_level", F.lit(node_level)) out_df = out_df.withColumn("node_description", F.col(node_description_col)) # Handle top node that has no parent is_top_node = parent_id_col is None and parent_level is None if not is_top_node: # Store values for its parent out_df = out_df.withColumn("parent_id", F.col(parent_id_col)) out_df = out_df.withColumn("parent_level", F.lit(parent_level)) else: # TODO : remove logic in favor of allowMissingColumns=True / Spark 3 feature out_df = out_df.withColumn("parent_id", F.lit(None)) out_df = out_df.withColumn("parent_level", F.lit(None)) # Concat its parent ids to get "his path" out_df = out_df.withColumn("parents_path", F.array(*parent_ids_cols)) # Cleanup before key generation out_df = out_df.select("node_id", "node_description", "node_level", "parent_id", "parent_level", "parents_path") # PKs are useful to "self-join" # Generate PK for node pk_cols = ["node_level", "node_id"] out_df = out_df.withColumn("node_pk", F.concat_ws("__", *pk_cols)) # Generate PK for parent pk_cols = ["parent_level", "parent_id"] out_df = out_df.withColumn("parent_pk", F.concat_ws("__", *pk_cols)) # Generate Title column title_cols = ["node_level", "node_description", "node_id"] out_df = out_df.withColumn("title", F.concat_ws(" - ", *title_cols)) return out_df out_df = create_object(tree_df, "level4", "level4", "level4", ["level1", "level2", "level3", "level4"], "level3", "level3") tmp_df = create_object(tree_df, "level3", "level3", "level3", ["level1", "level2", "level3"], "level2", "level2") out_df = out_df.unionByName(tmp_df) tmp_df = create_object(tree_df, "level2", "level2", "level2", ["level1", "level2"], "level1", "level1") out_df = out_df.unionByName(tmp_df) tmp_df = create_object(tree_df, "level1", "level1", "level1", [], None, None) out_df = out_df.unionByName(tmp_df) # TODO SPARK 3 : , allowMissingColumns=True out.write_dataframe(out_df, column_descriptions=COLS_DESCRIPTION)
code repositories
, code authoring
, python
, graph
, tree
How do I extract ancestors and descendants from a graph dataset using PySpark and NetworkX?
This code uses PySpark and NetworkX to prepare a graph dataset, create a directed graph, and extract the ancestors and descendants of each node in the graph.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F, types as T import networkx as nx GRAPH_SCHEMA = T.StructType([ T.StructField("node_id", T.StringType()), T.StructField("descendants", T.ArrayType(T.StringType())), T.StructField("ancestors", T.ArrayType(T.StringType())), ]) # Step 1: Prepare the dataset @transform_df( Output("prepared_graph_output"), graph_structured_dataset=Input("original_dataset_input") ) def prepare_graph(graph_structured_dataset): vertices = get_vertices(graph_structured_dataset) edges = get_edges(graph_structured_dataset) df = vertices.unionByName(edges) return df def get_vertices(df): df = ( df .select( "node_id", # The ID of the node F.lit(None).cast(T.StringType()).alias("child"), # An empty "child" column so the output can be merged with the edges F.lit("vertex").alias("type"), # The type of this row (it represents a vertex) F.col("_partition_column"), # The property on which nodes can be partitioned so the computation can run in parallel ) .dropDuplicates(["node_id"]) ) return df def get_edges(df): df = ( df .filter(F.col("parent_node_id").isNotNull()) .select( F.col("parent_node_id").alias("node_id"), # The ID of the node F.col("node_id").alias("child_id"), # A reference to the child of this node F.lit("edge").alias("type"), # The type of this row (it represents an edge) F.col("_partition_column"), # The property on which nodes can be partitioned so the computation can run in parallel ) .dropDuplicates(["node_id", "child_id"]) ) return df # Step 2: Create the graph using networkx and extract the properties you need @transform_df( Output("extracted_graph_properties"), prepared_graph=Input("prepared_graph_output"), ) def extract_graph_properties(prepared_graph): out = ( prepared_graph .groupby("_partition_column") .applyInPandas( myNetworkxUserDefinedFunction, schema=GRAPH_SCHEMA ) ) out = out.withColumn("ancestors", F.when(F.size(F.col("ancestors")) == 0, F.lit(None)).otherwise(F.col("ancestors")) ) return out def myNetworkxUserDefinedFunction(pandas_dataframe): vertices = pandas_dataframe[pandas_dataframe["type"] == "vertex"] edges = pandas_dataframe[pandas_dataframe["type"] == "edge"] df = vertices g = nx.DiGraph() g.add_edges_from(edges[['node_id', 'child_id']].to_records(index=False)) def get_descendants(source): if not (edges['node_id'] == source).any(): return None descendents = list(nx.bfs_tree(g, source)) return descendents[1:] df["descendants"] = df["node_id"].apply(get_descendants) def get_ancestors(source): path = [source] + [parent for parent, child, _ in nx.edge_dfs(g, source=source, orientation="reverse")] return path[1:] df["ancestors"] = df["node_id"].apply(get_ancestors) return df[["node_id", "ancestors", "descendants"]]
code authoring
, code repositories
, python
, tree
, graph
, networkx