データ統合パイプラインのビルド非構造化データのパイプラインCSVまたはJSONファイルのスキーマを推定する

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

CSVまたはJSONファイルのスキーマを推定する

データセットにスキーマがある場合、Foundryでの作業が最も簡単になります。Foundryでは、データセットに含まれるCSVまたはJSONファイルに手動でスキーマを追加することができます。これは、データセット内の スキーマを適用する ボタンを選択することで行えます。 スキーマを適用する ボタンは、データの一部に基づいて自動的にスキーマを推定します。スキーマが適用されたら、データセットビューで スキーマを編集する を選択し、行タイプを変更したり、追加の解析オプションを適用してジャギージャギした行をドロップしたり、エンコーディングを変更したり、ファイルパス、行のバイトオフセット、インポートタイムスタンプ、行番号などの追加の行を追加したりします。

初期データセットのファイルに基づいて静的に適用されたスキーマは、データが変更されると古くなる可能性があります。したがって、半構造化データの変換パイプラインの最初のステップとして、Sparkが動的にスキーマを推定することが有用です。

パイプラインビルドの各回でスキーマを動的に推定することはパフォーマンスコストがかかるため、この技術は慎重に使用するべきです(例えば、スキーマが変更される可能性がある場合など)。

以下に、CSVとJSONの入力の例を示します。

Transformsのデフォルトの出力ファイル形式であるParquetは、自動的に推定されたスキーマに存在する可能性のある特定の特殊文字を許可しません。したがって、以下の例にあるように sanitize_schema_for_parquet を使用することをお勧めします。これにより、潜在的な問題を防ぐことができます。

CSV

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from transforms.api import transform, Input, Output from transforms.verbs.dataframes import sanitize_schema_for_parquet @transform( output=Output("/Company/sourceA/parsed/data"), raw=Input("/Company/sourceA/raw/data_csv"), ) def read_csv(ctx, raw, output): filesystem = raw.filesystem() hadoop_path = filesystem.hadoop_path # ハドゥープファイルパスを作成 files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()] df = ( ctx .spark_session .read .option("encoding", "UTF-8") # UTF-8はデフォルトです .option("header", True) # ヘッダーがあります .option("inferSchema", True) # スキーマを自動推定 .csv(files) ) # データフレームを書き込み、sanitize_schema_for_parquet関数を使用してスキーマを調整 output.write_dataframe(sanitize_schema_for_parquet(df))

JSON

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from transforms.api import transform, Input, Output from transforms.verbs.dataframes import sanitize_schema_for_parquet # transformデコレータを使い、データトランスフォーム関数を定義します @transform( output=Output("/Company/sourceA/parsed/data"), # 変換後のデータの出力先を指定します raw=Input("/Company/sourceA/raw/data_json"), # 入力データのパスを指定します ) def read_json(ctx, raw, output): filesystem = raw.filesystem() # 入力データのファイルシステムオブジェクトを取得します hadoop_path = filesystem.hadoop_path # Hadoopのパスを取得します files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()] # ファイルシステムからファイルのリストを取得します df = ( ctx .spark_session # Sparkセッションを開始します .read # データを読み込むための操作を開始します .option("multiline", False) # Falseはデフォルトです。各ファイルが改行区切りのJSONオブジェクトではなく、単一のJSONオブジェクトを含む場合はTrueを使用します .json(files) # JSONファイルを読み込みます ) # parquet形式でスキーマをクリーンアップしたデータフレームを書き出します output.write_dataframe(sanitize_schema_for_parquet(df))