コードの例Raw file parsingトランスフォーム

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

トランスフォーム

Python

Python で Excel ファイルを解析する

PySpark と Openpyxl を使用して、動的スキーマを持つ複雑な Excel ファイルを分散処理で読み込み、処理する方法は?

このコードは PySpark と Openpyxl ライブラリを使用して、入力ファイルシステムから複数の Excel ファイルを読み取り、その内容を解析し、PySpark DataFrame に変換します。DataFrame は 1 つの DataFrame に結合され、出力に書き込まれます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import tempfile import shutil import openpyxl import functools @transform( processed_excel=Output("example_processed_dataframe"), excel_input=Input("example_excel_dataframe"), ) def compute(ctx, processed_excel, excel_input): def parse_file(file_status): # Excelファイルを開く with excel_input.filesystem().open(file_status.path, "rb") as in_xlsx: # 処理用の一時ファイルを作成 with tempfile.NamedTemporaryFile(suffix=".xlsx") as tmp_xlsx: shutil.copyfileobj(in_xlsx, tmp_xlsx) tmp_xlsx.flush() # Excelワークブックをロードし、その内容を解析 try: workbook = openpyxl.load_workbook(tmp_xlsx.name) return parse_workbook(workbook) except: return None # 入力ファイルシステムからExcelファイルのリストを取得 files_df = excel_input.filesystem().files() # 'parse_file'関数を使用して各ファイルを解析 parsed_files = files_df.rdd.map(parse_file).collect() # 解析されたファイルをPySparkデータフレームに変換 dfs = [] for parsed_file in parsed_files: dfs.append(convert_to_df(ctx, parsed_file)) # データフレームを結合し、結果を出力に書き込む df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs) processed_excel.write_dataframe(df)
  • parse_file関数のコメントを追加し、日本語に翻訳しました。
  • files_dfの取得からデータフレームの結合と出力までのコメントを追加し、日本語に翻訳しました。
  • 提出日: 2024-08-12
  • タグ: Code Authoring, Code Repositories, python, openpyxl

シェープファイルを結合してGeoJSONに変換する

複数のシェープファイルを結合し、GeoJSON形式に変換するにはどうすればよいですか?

このコードは、geospatial_toolsライブラリを使用して複数のシェープファイルを読み込み、それらのジオメトリをGeoJSON形式に変換し、1つのPySpark DataFrameに結合します。また、各ジオメトリの重心を計算し、それをGeohashに変換します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from transforms.api import transform, Input, Output from geospatial_tools import geospatial from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash import tempfile import shutil import geopandas as gpd from pyspark.sql import types as T from pyspark.sql import functions as F import json from shapely.geometry import mapping @geospatial() @transform( output=Output(), input_data=Input(), ) def compute(ctx, input_data, output): fs = input_data.filesystem() # スキーマの定義 schema = T.StructType([T.StructField("geoshape", T.StringType()), T.StructField("name", T.StringType()), T.StructField("centroid", T.StringType())]) # .shpファイルのリストを取得 shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')] combined_data = ctx.spark_session.createDataFrame([], schema) for shapefile in shapefiles: # NOQA with tempfile.TemporaryDirectory() as tmp_dir: # Shapefileに関連するすべてのファイルをローカルファイルシステムにコピー # .prjや.cpgなど複数のファイルが存在する for shapefile_file in fs.ls(glob=f'{shapefile}.*'): with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file: with fs.open(shapefile_file.path, 'rb') as f: shutil.copyfileobj(f, tmp_file) # GeoJSONジオメトリ列を作成 pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp') pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x))) df = ctx.spark_session.createDataFrame(pdf) # Foundryが期待するEPSG:4326形式に変換 crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string() df = df.withColumn( "geoshape", clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326")) ).select("geoshape") df = df.withColumn('name', F.lit(shapefile)) df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape'))) combined_data = combined_data.unionByName(df) return output.write_dataframe(combined_data)
  • 提出日: 2024-05-23
  • タグ: geospatial, shapefile, geojson, Geohash, pyspark, geopandas

データセット間で生ファイルをコピーする

Pythonトランスフォーム内で入力データセットから出力データセットに生ファイルをコピーするにはどうすればよいですか?

このコードは、入力データセットから出力データセットに生ファイルをコピーするためのPySparkトランスフォーム関数を定義しています。この関数は 'shutil' ライブラリを使用してファイルのバイトをコピーし、すべてのファイルまたは提供された正規表現パターンに基づいて一部のファイルをコピーすることができます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from transforms.api import transform, Input, Output from pyspark.sql import DataFrame from functools import reduce import shutil # 入力データセットから出力データセットに生のファイルをコピーするPythonのtransform @transform( my_output=Output("my_output_dataset"), my_input=Input("my_input_dataset") ) def copy_my_input(my_output, my_input): copy_raw_files(my_output, my_input, [".*\.csv"], False) def copy_raw_files(my_output, my_input, regexes, copy_full=False): # 生ファイルをコピーする関数 def copy_file(file_status): # 入力データフレームのファイルシステムで指定されたファイルを開く with my_input.filesystem().open(file_status.path, 'rb') as in_f: # 出力データフレームのファイルシステムでファイルを開く with my_output.filesystem().open(file_status.path, 'wb') as out_f: # 入力から出力へファイルのバイトをコピーする shutil.copyfileobj(in_f, out_f) # すべてのファイルをコピーするか、一部のファイルのみをコピーするかを選択 if copy_full: files_df = my_input.filesystem().files() else: files_to_copy = [] for regex in regexes: # 正規表現にマッチするファイルのみをコピーする files_to_copy.append(my_input.filesystem().files(regex=regex)) # すべてのファイルを含むデータフレームを作成 files_df = reduce(DataFrame.unionByName, files_to_copy) # コピー操作を並列化する files_df.rdd.foreach(copy_file)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python

ファイル処理

PySpark を使用してデータセット内の複数のファイルを処理するにはどうすればよいですか?

このコードは、PySpark を使用してデータセット内の複数のファイル(gzip 圧縮ファイルを含む)を処理し、各ファイルの最初の行を読み取り、ファイル情報と最初の行の内容を持つデータフレームを作成します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F from pyspark.sql import Row import gzip import io # @incrementalデコレータはどちらでも使用可能 # @transform_dfを@transformに変更 # これにより、入力および出力に対する制御がより柔軟になり、入力データセットの「ファイル」バージョンにアクセスするために必要です @transform( output_dataset_1=Output(""), output_dataset_2=Output(""), input_dataset=Input("") ) def example_transform_file_processing(ctx, input_dataset, output_dataset_1, output_dataset_2): # "files()"メソッドは、入力データセットのファイルシステムを表すデータフレームを返します fs = input_dataset.filesystem() files_df = input_dataset.filesystem().files() # ここで各ファイルのパスを抽出し、必要に応じて処理することができます # ==== 計算の例 # rddのflatmap出力のスキーマを定義 schema = T.StructType([ T.StructField('hadoop_path', T.StringType()), T.StructField('file_name', T.StringType()), T.StructField('size', T.LongType()), T.StructField('modified', T.LongType()), T.StructField('first_row_content', T.StringType()) ]) cols = schema.fieldNames() # "hadoop_path", "file_name", "size", "modified"に相当 MyRow = Row(*cols) # RDDのUDFライクな関数の戻り値の型として使用するための"MyRow"オブジェクトを定義 # 1つのファイルを解析するインライン関数(アイデア:UDFのようなもの、ただしRDD用) def process_file(file_status): # 処理の例:各ファイルの最初の行を読み取る line = "default value" try: line = "WARNING: Not supported file type." if file_status.path.endswith('.gz'): # Gzippedファイルを処理 with fs.open(file_status.path, "rb") as f: gz = gzip.GzipFile(fileobj=f) br = io.BufferedReader(gz) tw = io.TextIOWrapper(br) line = tw.readline() else: with fs.open(file_status.path, "r") as f: line = f.readline() except Exception as e: line = "ERROR: " + str(e) # RDD要素から行を作成する yield MyRow(fs.hadoop_path, file_status.path, file_status.size, file_status.modified, line) # ファイルのデータフレームをRDDに変換。詳細は https://spark.apache.org/docs/latest/rdd-programming-guide.html を参照 rdd = files_df.rdd # RDDの各要素に関数を適用 rdd = rdd.flatMap(process_file) # RDDをデータフレームに変換し、出力に簡単に書き込めるようにする # スキーマを指定することで空のrddを処理できる output_df = ctx.spark_session.createDataFrame(rdd, schema) # タイムスタンプを追加 output_df = output_df.withColumn('processed_at', F.current_timestamp()) # ==== 計算の例終了 # ファイルシステムのデータフレーム表示を出力に書き込み output_dataset_1.write_dataframe(files_df) # 処理されたデータフレームを出力に書き込み output_dataset_2.write_dataframe(output_df)
  • 提出日: 2024-03-20
  • タグ: code authoring, code repositories, python, gzip, zip

PySparkを使用してORCファイルを読み込む

PySparkを使用してORCファイルをどのように読み込みますか?

このコードは、入力データセットのHadoopパスから生のORCファイルを読み込み、結果のSparkデータフレームを出力に書き込みます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, Input, Output @transform( out=Output("output"), raw=Input("input"), ) def compute(ctx, out, raw): hadoop_path = raw.filesystem().hadoop_path # Hadoopのパスを取得 df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/') # ORCフォーマットでデータを読み込み out.write_dataframe(df) # データフレームを書き出し
  • Date submitted: 2024-07-18
  • Tags: pyspark, dataframe, orc, hadoop

PySpark を使用して SAS ファイルを解析する

SAS データセットから PySpark dataframe を作成するにはどうすればよいですか?

このコードは、未処理の SAS ファイルを含む入力データセットを取得し、PySpark dataframe を作成するトランスフォーム関数を定義します。spark-sas7bdat パッケージを使用して SAS ファイルを読み取り、dataframe にロードします。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @transform( output=Output("xxxxx"), # Foundry RIDをここに記入 input_df=Input("xxxxx") # Foundry RIDをここに記入 ) def parse_sas_file(ctx, input_df, output, sas_path="*.sas7bdat"): ''' SASデータセットからPySparkデータフレームを作成します この関数はドライバで計算を行うため、ドライバメモリの増加が必要な場合があります ctx: Sparkコンテキスト input_df: 生のSASファイルを含む入力データセット sas_path: データセット内のSASファイルのパス、デフォルトではデータセット内のすべてのSASファイル include_filename_as_field: ファイル名を下流解析用のカラムとして含めるかどうか、デフォルトはfalse ''' fs = input_df.filesystem() # ファイルシステムを取得 hadoop_path = fs.hadoop_path # Hadoopパスを取得 files_df = fs.files(sas_path) # 指定されたパスのファイルを取得 # dfs = [] spark_session = ctx.spark_session.builder.appName(ctx.spark_session.sparkContext.appName).config('spark.jars.packages', 'saurfang:spark-sas7bdat:3.0.0-s_2.12').getOrCreate() # TODO: 複数パスに対応するように更新する # バッキングデータセットからファイルを読み込む path = files_df.collect()[0].path # 最初のファイルのパスを取得 full_path = f'{hadoop_path}/{path}' # フルパスを生成 df = spark_session.read.format("com.github.saurfang.sas.spark").load(full_path) # SASファイルを読み込む output.write_dataframe(df) # データフレームを書き出す
  • 提出日: 2024-07-29
  • タグ: pyspark, dataframe, sas, code repositories

複数ファイルの処理と結合

データセット内の複数ファイルを処理し、1 つの PySpark DataFrame に結合するにはどうすればよいですか?

このコードは、複数ファイルを含む入力データセットを受け取り、各ファイルを個別に処理し、その結果を 1 つの PySpark DataFrame に結合する PySpark トランスフォームを定義します。データセット内の各ファイルに 'parse_file' 関数を適用するために 'map' 関数を使用し、結果を収集し、すべての DataFrame を結合します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import tempfile import shutil import functools # PySparkの変換を定義 @transform( processed_files=Output("example_processed_files_dataset"), file_dataset=Input("example_file_dataset"), ) def compute(ctx, processed_files, file_dataset): # 単一のファイルを解析する関数 def parse_file(file_status): with file_dataset.filesystem().open(file_status.path, "rb") as in_file: with tempfile.NamedTemporaryFile() as tmp_file: shutil.copyfileobj(in_file, tmp_file) tmp_file.flush() # ファイルをローカルで処理し、Pythonオブジェクトを返す return process_file_locally_and_return_python_object(tmp_file) # データセット内のファイルのリストを取得 files_df = file_dataset.filesystem().files() # 各ファイルを解析し、結果を収集 parsed_files = files_df.rdd.map(parse_file).collect() dfs = [] for parsed_file in parsed_files: # 解析されたファイルをPySpark DataFrameに変換 dfs.append(convert_to_df(ctx, parsed_file)) # すべてのDataFrameを結合 df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs) # 結果のDataFrameを出力データセットに書き込む processed_files.write_dataframe(df)
  • parse_file関数では、file_datasetからファイルを読み込み、一時ファイルにコピーしてローカルで処理します。
  • files_df.rdd.map(parse_file).collect()で各ファイルを解析し、結果を収集します。
  • 解析されたファイルはconvert_to_df関数でDataFrameに変換され、すべてのDataFrameはunionByNameを用いて結合されます。
  • 最後に、結合されたDataFrameをprocessed_filesに書き込みます。
  • 提出日: 2024-03-20
  • タグ: Code Authoring, Code Repositories, python, 生ファイル, 非構造化

DOCXファイルから内容を抽出する

Pythonを使用してDOCXファイルから内容を抽出する方法は?

このコードはpython-docxライブラリを使用して、データセットからDOCXファイルの内容を読み取り、Documentオブジェクトに格納してさらに処理します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from transforms.api import transform, Input, Output import docx as dx from io import BytesIO @transform( output=Output("output_dataset"), docs=Input("input_dataset"), ) def compute(ctx, docs, output): fs = docs.filesystem() doc_file = list(fs.ls(regex=r'.*\.docx'))[0] # ファイルシステムを使ってファイルを開き、内容をBytesIOオブジェクトに読み込む with fs.open(doc_file.path, 'rb') as f: source_stream = BytesIO(f.read()) document = dx.Document(source_stream) source_stream.close() # documentオブジェクトに対して何らかの処理を行う
  • 提出日: 2024-03-20
  • タグ: Code Authoring, python, python-docx, bytesio, raw files, unstructured

ZIP圧縮されたCSVファイルの処理

入力データセットから複数のCSVファイルを含むZIPファイルを読み取り、処理したデータをPySparkで出力データセットに書き込むにはどうすればよいですか?

このコードは、PySparkを使用して入力データセット内のCSVを含むZIPファイルを読み取り、各CSVの最初の行をスキップして処理し、処理したデータを出力データセットに書き込みます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import shutil import tempfile import zipfile import io @transform( my_output=Output("my_output_dataset"), my_input=Input("my_input_dataset") ) def compute(ctx, my_output, my_input): # 入力データセット内の各ファイルを処理する関数 def process_file(file_status): with fs.open(file_status.path, 'rb') as f: with tempfile.NamedTemporaryFile() as tmp: shutil.copyfileobj(f, tmp) tmp.flush() # ZIPファイルを読み込み、処理する with zipfile.ZipFile(tmp) as archive: for filename in archive.namelist(): with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # 各CSVの最初の行をスキップ # CSV内の各行を読み込み、処理する for line in tw: yield MyRow(*line.split(",")) # 入力データセットを読み込み、各ファイルを処理する rdd = my_input.files().rdd rdd = rdd.flatMap(process_file) df = rdd.toDF() # 処理されたデータを出力データセットに書き込む my_output.write_dataframe(df)
  • 提出日: 2024-03-20
  • タグ: code authoring, code repositories, python, zip, csv

データセット内のファイルを解凍および抽出する

データセット内のファイルを解凍するにはどうすればよいですか?

このコードは、PySpark を使用して入力から圧縮ファイルを読み取り、内容を抽出し、抽出されたファイルを出力に書き込みます。圧縮ファイルを順に読み込み、その内容を BytesIO ストリームに読み込み、zipfile ライブラリを使用してファイルを一時ディレクトリに抽出します。抽出されたファイルはその後出力に書き込まれます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 # from pyspark.sql import functions as F from transforms.api import transform, Input, Output import zipfile import tempfile import os from io import BytesIO @transform( unzipped=Output(""), zipped=Input(""), ) def compute(unzipped, zipped): # zippedディレクトリからすべての.zipファイルを取得する zip_files = zipped.filesystem().files(glob="*.zip").collect() for zip_file in zip_files: # .zipファイルをバイナリ読み込みモードで開く with zipped.filesystem().open(zip_file["path"], 'rb') as zip_f: source_stream = BytesIO(zip_f.read()) # .zipファイルを読み込む with zipfile.ZipFile(source_stream, 'r') as zip_ref: # 一時ディレクトリを作成し、.zipファイルの内容を展開する with tempfile.TemporaryDirectory() as temp_dir: zip_ref.extractall(temp_dir) # 展開されたファイルを処理する for path in iterate_directories(temp_dir): output_file_name = path.replace(temp_dir, "") # 出力ファイルを開き、内容を書き込む with unzipped.filesystem().open(output_file_name, "w") as out_f: with open(path, 'r') as in_f: out_f.write(in_f.read()) def iterate_directories(directory): # 指定されたディレクトリを再帰的に走査し、ファイルパスを生成する for root, dirs, files in os.walk(directory): for file in files: path = os.path.join(root, file) if is_leaf_file(path): yield path def is_leaf_file(path): # 指定されたパスがファイルであり、シンボリックリンクでない場合にTrueを返す return os.path.isfile(path) and not os.path.islink(path)
  • 提出日: 2024-03-20
  • タグ: Code Authoring, Code Repositories, python, raw files, zip, unzip

データセットファイルをZipにする

ファイルのデータセットからZipファイルを作成するにはどうすればよいですか?

このコードは、トランスフォーム API を使用してソースデータセットからすべての Markdown ファイルを読み取り、これらのファイルを含むZipファイルを作成します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from transforms.api import transform, Input, Output import zipfile @transform( my_output=Output(""), source_df=Input(""), ) def compute(ctx, my_output, source_df): # .mdファイルを収集 files = source_df.filesystem().files(glob="*.md").collect() # 出力用のZIPファイルを作成 with my_output.filesystem().open("foundry_code_examples.zip", 'wb') as write_zip: with zipfile.ZipFile(write_zip.name, 'w') as zip_file: for file_row in files: # 各.mdファイルをZIPに追加 with source_df.filesystem().open(file_row["path"], 'rb') as markdown_file: zip_file.write(markdown_file.name, arcname=file_row["path"]) # 入力データフレームをそのまま返す return source_df
  • 提出日: 2024-03-26
  • タグ: raw files, zip, python, Code Authoring, Code Repositories, export

Java

複雑で複数行のヘッダーを持つExcelファイルを解析する

複雑で複数行のヘッダーを持つExcelファイルをどのように解析できますか?

このコードは、transforms-excel-parserライブラリを使用して複雑なヘッダーを持つExcelファイルを解析する方法を示しています。MultilayerMergedHeaderExtractorを使用してTableParserを作成し、次にTableParserを使用してTransformsExcelParserを作成します。最後に、TransformsExcelParserを使用して入力データセットのExcelファイルからデータを抽出し、その結果を出力に書き込みます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package myproject.datasets; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.table.MultilayerMergedHeaderExtractor; import com.palantir.transforms.excel.table.TableParser; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import java.util.Optional; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class ComplexHeaderExcel { @Compute public void myComputeFunction( @Input("<input_dataset_rid>") FoundryInput myInput, @Output("<output_dataset_rid>") FoundryOutput myOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput ) { // 複数層にわたる結合ヘッダーを抽出するTableParserを作成 Parser tableParser = TableParser.builder() .headerExtractor(MultilayerMergedHeaderExtractor.builder() .topLeftCellName("A1") // ヘッダーの左上のセルの名前 .bottomRightCellName("D2") // ヘッダーの右下のセルの名前 .build()) .build(); // TableParserを使用してTransformsExcelParserを作成 TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser); // 入力を解析 ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // 解析されたデータを取得(入力に行がない場合やエラーが発生した場合は空) Optional<Dataset<Row>> maybeDf = result.singleResult(); // 解析されたデータが空でない場合、出力データセットに書き込む maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write()); // エラー情報をエラー出力データセットに書き込む errorOutput.getDataFrameWriter(result.errorDataframe()).write(); } }
  • Date submitted: 2024-08-08
  • Tags: Code Authoring, Code Repositories, java, transforms-excel-parser, excel

非表形式 (フォーム) データを含む Excel ファイルの解析

データが表形式でない Excel ファイルをどのように解析しますか?

このコードは、複数のシートにわたるフォームを含む Excel ファイルからデータを抽出するために transforms-excel-parser ライブラリを使用する方法を示しています。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package myproject.datasets; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.form.FieldSpec; import com.palantir.transforms.excel.form.FormParser; import com.palantir.transforms.excel.form.Location; import com.palantir.transforms.excel.form.cellvalue.AdjacentCellAssertion; import com.palantir.transforms.excel.form.cellvalue.CellValue; import com.palantir.transforms.excel.functions.RegexSubstringMatchingSheetSelector; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; public final class FormStyleExcel { private static final String FORM_A_KEY = "FORM_A"; private static final String FORM_B_KEY = "FORM_B"; @Compute public void myComputeFunction( @Input("<input_dataset_rid") FoundryInput myInput, @Output("<form_a_output_dataset_rid>") FoundryOutput formAOutput, @Output("<form_b_output_dataset_rid>") FoundryOutput formBOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput) { // Form A パーサーの設定 Parser formAParser = FormParser.builder() .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_A")) .addFieldSpecs(createFieldSpec("form_a_field_1", "B1")) .addFieldSpecs(createFieldSpec("form_a_field_2", "B2")) .build(); // Form B パーサーの設定 Parser formBParser = FormParser.builder() .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B")) .addFieldSpecs(createFieldSpec("form_b_field_1", "B1")) .addFieldSpecs(createFieldSpec("form_b_field_2", "B2")) .build(); // Form A と Form B の両方のパーサーを含む TransformsExcelParser の設定 TransformsExcelParser transformsParser = TransformsExcelParser.builder() .putKeyToParser(FORM_A_KEY, formAParser) .putKeyToParser(FORM_B_KEY, formBParser) .build(); // 入力データの解析 ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // 解析されたデータを出力データセットに書き込む result.dataframeForKey(FORM_A_KEY) .ifPresent(df -> formAOutput.getDataFrameWriter(df).write()); result.dataframeForKey(FORM_B_KEY) .ifPresent(df -> formBOutput.getDataFrameWriter(df).write()); // エラー情報をエラー出力データセットに書き込む errorOutput.getDataFrameWriter(result.errorDataframe()).write(); } // 適切なアサーションを持つ FieldSpec を簡潔に作成するためのヘルパーメソッド private static FieldSpec createFieldSpec(String fieldName, String cellLocation) { return FieldSpec.of( fieldName, CellValue.builder() .addAssertions(AdjacentCellAssertion.left(1, fieldName)) .location(Location.of(cellLocation)) .build()); } }
  • Date submitted: 2024-08-06
  • Tags: code authoring, code repositories, java, transforms-excel-parser, excel

シンプルな表形式のExcelファイルを解析する

Transforms Excel Parser を使用してシンプルな表形式のExcelファイルをどのように解析しますか?

このコードは、transforms-excel-parserライブラリを使用してシンプルな表形式のExcelファイルを含むデータセットを解析する方法を示しています。SimpleHeaderExtractor を使用して TableParser を作成し、その後 TableParser を使用して TransformsExcelParser を作成します。最後に、TransformsExcelParser を使用して入力データセット内のファイルを解析し、抽出されたデータを出力データセットに書き込みます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package myproject.datasets; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.table.SimpleHeaderExtractor; import com.palantir.transforms.excel.table.TableParser; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import java.util.Optional; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class SimpleTabularExcel { @Compute public void myComputeFunction( @Input("<input_dataset_rid>") FoundryInput myInput, @Output("<output_dataset_rid>") FoundryOutput myOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput ) { // SimpleHeaderExtractorを適切に構成したTableParserを作成 // この例では、ファイルのヘッダーは2行目にあります。 // ヘッダーが1行目にある場合、rowsToSkipを指定する必要はありません。 // デフォルト値は0であり、その場合、TableParser.builder().build()だけで済みます。 Parser tableParser = TableParser.builder() .headerExtractor( SimpleHeaderExtractor.builder().rowsToSkip(1).build()) .build(); // TableParserを使用してTransformsExcelParserを作成 TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser); // 入力を解析 ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // 解析されたデータを取得、入力に行がない場合やエラーが発生した場合は空になる可能性があります Optional<Dataset<Row>> maybeDf = result.singleResult(); // 解析されたデータが空でない場合、出力データセットに書き込み maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write()); // エラー情報をエラー出力に書き込み errorOutput.getDataFrameWriter(result.errorDataframe()).write(); }
  • 提出日: 2024-08-08
  • タグ: code authoring, code repositories, java, transforms-excel-parser, excel