コードの例Graph and tree structured datasetsトランスフォーム
Warning

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

トランスフォーム

Python

階層ツリーデータをフラット化する

階層ツリーデータ構造を親子関係を持つフラットなテーブルにフラット化するにはどうすればよいですか?

このコードは、PySpark を使用して階層ツリーデータ構造を親子関係を持つフラットなテーブルにトランスフォームします。レベルごとにオブジェクトを抽出する関数を作成し、ノードおよび親の一意の主キーを生成します。出力データフレームには、node_idnode_descriptionnode_levelparent_idparent_levelparents_pathnode_pk、および parent_pk の列が含まれます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 from transforms.api import transform_df, Input, Output, transform from pyspark.sql import functions as F # 定数 COL_ORDER = ["level1", "level2", "level3", "level4"] COLS_DESCRIPTION = { "node_id": "ノードの識別子、一意ではない", "node_description": "人間が読み取れるノードの識別子、一意ではない", "node_level": "ノードの階層レベル", "parent_id": "ノードの親の識別子、一意ではない", "parent_level": "人間が読み取れるノードの親の識別子、一意ではない", "parents_path": "最も上位の親から最も近い親までのparent_idの配列", "node_pk": "一意の識別子", "parent_pk": "一意の識別子" } ''' 以下のような形式のデータを: level1 | level2 | level3 | level4 | some_value root | folder1 | folder2 | file_name | file_content 次のように変換する: node_id | node_level | parent_id | parent_level | value root | level1 | null | null | null folder1 | level2 | root | level1 | null folder2 | level3 | folder1 | level2 | null file_name | level4 | folder2 | level3 | file_content ''' def flatten_tree_data(tree_df, out): tree_df = tree_df.dataframe() # 各階層ごとにオブジェクトを抽出する関数 def create_object(df, node_id_col="level2", node_description_col="level2", node_level="level2", parent_ids_cols=["level1", "...", "level3"], parent_id_col="level3", parent_level="level3"): # 列をフィルタリング # 注意: parent_id_colはparent_ids_colsに含まれている必要がある # セットを使って重複を削除 columns_to_keep = list(set([node_id_col, node_description_col, *parent_ids_cols])) out_df = df.select(columns_to_keep) # 重複を削除するためにDISTINCTを使用 out_df = out_df.distinct() # 特定のノードの値を保存 out_df = out_df.withColumn("node_id", F.col(node_id_col)) out_df = out_df.withColumn("node_level", F.lit(node_level)) out_df = out_df.withColumn("node_description", F.col(node_description_col)) # 親がいないトップノードを処理 is_top_node = parent_id_col is None and parent_level is None if not is_top_node: # 親の値を保存 out_df = out_df.withColumn("parent_id", F.col(parent_id_col)) out_df = out_df.withColumn("parent_level", F.lit(parent_level)) else: # TODO: Spark 3の機能allowMissingColumns=Trueを利用するためにロジックを削除 out_df = out_df.withColumn("parent_id", F.lit(None)) out_df = out_df.withColumn("parent_level", F.lit(None)) # 親IDを結合して「パス」を取得 out_df = out_df.withColumn("parents_path", F.array(*parent_ids_cols)) # キー生成前のクリーンアップ out_df = out_df.select("node_id", "node_description", "node_level", "parent_id", "parent_level", "parents_path") # PKは「自己結合」に便利 # ノードのPKを生成 pk_cols = ["node_level", "node_id"] out_df = out_df.withColumn("node_pk", F.concat_ws("__", *pk_cols)) # 親のPKを生成 pk_cols = ["parent_level", "parent_id"] out_df = out_df.withColumn("parent_pk", F.concat_ws("__", *pk_cols)) # タイトル列を生成 title_cols = ["node_level", "node_description", "node_id"] out_df = out_df.withColumn("title", F.concat_ws(" - ", *title_cols)) return out_df out_df = create_object(tree_df, "level4", "level4", "level4", ["level1", "level2", "level3", "level4"], "level3", "level3") tmp_df = create_object(tree_df, "level3", "level3", "level3", ["level1", "level2", "level3"], "level2", "level2") out_df = out_df.unionByName(tmp_df) tmp_df = create_object(tree_df, "level2", "level2", "level2", ["level1", "level2"], "level1", "level1") out_df = out_df.unionByName(tmp_df) tmp_df = create_object(tree_df, "level1", "level1", "level1", [], None, None) out_df = out_df.unionByName(tmp_df) # TODO SPARK 3 : , allowMissingColumns=True out.write_dataframe(out_df, column_descriptions=COLS_DESCRIPTION)
  • 提出日: 2024-03-26
  • タグ: code repositories, Code Authoring, python, graph, tree

グラフデータセットから先祖と子孫を抽出する

PySpark と NetworkX を使用して、グラフデータセットから先祖と子孫を抽出するにはどうすればよいですか?

このコードは、PySpark と NetworkX を使用してグラフデータセットを準備し、有向グラフを作成し、グラフ内の各ノードの先祖と子孫を抽出します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F, types as T import networkx as nx GRAPH_SCHEMA = T.StructType([ T.StructField("node_id", T.StringType()), T.StructField("descendants", T.ArrayType(T.StringType())), T.StructField("ancestors", T.ArrayType(T.StringType())), ]) # Step 1: データセットの準備 @transform_df( Output("prepared_graph_output"), graph_structured_dataset=Input("original_dataset_input") ) def prepare_graph(graph_structured_dataset): vertices = get_vertices(graph_structured_dataset) edges = get_edges(graph_structured_dataset) df = vertices.unionByName(edges) return df # 頂点を取得する関数 def get_vertices(df): df = ( df .select( "node_id", # ノードのID F.lit(None).cast(T.StringType()).alias("child"), # エッジと結合できるように空の"child"列を追加 F.lit("vertex").alias("type"), # この行のタイプ(頂点を表す) F.col("_partition_column"), # 並列計算のためのパーティション列 ) .dropDuplicates(["node_id"]) ) return df # エッジを取得する関数 def get_edges(df): df = ( df .filter(F.col("parent_node_id").isNotNull()) .select( F.col("parent_node_id").alias("node_id"), # ノードのID F.col("node_id").alias("child_id"), # このノードの子ノードの参照 F.lit("edge").alias("type"), # この行のタイプ(エッジを表す) F.col("_partition_column"), # 並列計算のためのパーティション列 ) .dropDuplicates(["node_id", "child_id"]) ) return df # Step 2: networkxを使用してグラフを作成し、必要なプロパティを抽出 @transform_df( Output("extracted_graph_properties"), prepared_graph=Input("prepared_graph_output"), ) def extract_graph_properties(prepared_graph): out = ( prepared_graph .groupby("_partition_column") .applyInPandas( myNetworkxUserDefinedFunction, schema=GRAPH_SCHEMA ) ) out = out.withColumn("ancestors", F.when(F.size(F.col("ancestors")) == 0, F.lit(None)).otherwise(F.col("ancestors")) ) return out # pandasデータフレームを使用してnetworkxグラフを処理する関数 def myNetworkxUserDefinedFunction(pandas_dataframe): vertices = pandas_dataframe[pandas_dataframe["type"] == "vertex"] edges = pandas_dataframe[pandas_dataframe["type"] == "edge"] df = vertices g = nx.DiGraph() g.add_edges_from(edges[['node_id', 'child_id']].to_records(index=False)) # 子孫ノードを取得する関数 def get_descendants(source): if not (edges['node_id'] == source).any(): return None descendents = list(nx.bfs_tree(g, source)) return descendents[1:] df["descendants"] = df["node_id"].apply(get_descendants) # 祖先ノードを取得する関数 def get_ancestors(source): path = [source] + [parent for parent, child, _ in nx.edge_dfs(g, source=source, orientation="reverse")] return path[1:] df["ancestors"] = df["node_id"].apply(get_ancestors) return df[["node_id", "ancestors", "descendants"]]
  • 提出日: 2024-03-20
  • タグ: code authoring, code repositories, python, tree, graph, networkx