How can I read and process complex Excel files with dynamic schema in a distributed manner using PySpark and Openpyxl?
This code uses PySpark and the Openpyxl library to read multiple Excel files from an input filesystem, parse their content, and convert them into PySpark DataFrames. The DataFrames are then combined into a single DataFrame and written to the output.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import tempfile import shutil import openpyxl import functools @transform( processed_excel=Output("example_processed_dataframe"), excel_input=Input("example_excel_dataframe"), ) def compute(ctx, processed_excel, excel_input): def parse_file(file_status): # Open the Excel file with excel_input.filesystem().open(file_status.path, "rb") as in_xlsx: # Create a temporary file for processing with tempfile.NamedTemporaryFile(suffix=".xlsx") as tmp_xlsx: shutil.copyfileobj(in_xlsx, tmp_xlsx) tmp_xlsx.flush() # Load the Excel workbook and parse its content try: workbook = openpyxl.load_workbook(tmp_xlsx.name) return parse_workbook(workbook) except: return None # Get the list of Excel files from the input filesystem files_df = excel_input.filesystem().files() # Parse each file using the 'parse_file' function parsed_files = files_df.rdd.map(parse_file).collect() # Convert the parsed files to PySpark dataframes dfs = [] for parsed_file in parsed_files: dfs.append(convert_to_df(ctx, parsed_file)) # Union the dataframes and write the result to the output df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs) processed_excel.write_dataframe(df)
code authoring
, code repositories
, python
, openpyxl
How do I combine multiple shapefiles and convert them to GeoJSON format?
This code uses the geospatial_tools
library to read multiple shapefiles, convert their geometries to GeoJSON format, and combine them into a single PySpark DataFrame. It also computes the centroid of each geometry and converts it to a geohash.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
from transforms.api import transform, Input, Output from geospatial_tools import geospatial from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash import tempfile import shutil import geopandas as gpd from pyspark.sql import types as T from pyspark.sql import functions as F import json from shapely.geometry import mapping @geospatial() @transform( output=Output(), input_data=Input(), ) def compute(ctx, input_data, output): fs = input_data.filesystem() schema = T.StructType([T.StructField("geoshape", T.StringType()), T.StructField("name", T.StringType()), T.StructField("centroid", T.StringType())]) shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')] combined_data = ctx.spark_session.createDataFrame([], schema) for shapefile in shapefiles: # NOQA with tempfile.TemporaryDirectory() as tmp_dir: # Copy all files for the shapefile to the local filesystem # There are multiple files associated with a shapefile, such as .prj and .cpg for shapefile_file in fs.ls(glob=f'{shapefile}.*'): with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file: with fs.open(shapefile_file.path, 'rb') as f: shutil.copyfileobj(f, tmp_file) # Create a GeoJSON geometry column pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp') pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x))) df = ctx.spark_session.createDataFrame(pdf) # Convert everything to EPSG:4326 format expected by Foundry crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string() df = df.withColumn( "geoshape", clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326")) ).select("geoshape") df = df.withColumn('name', F.lit(shapefile)) df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape'))) combined_data = combined_data.unionByName(df) return output.write_dataframe(combined_data)
geospatial
, shapefile
, geojson
, geohash
, pyspark
, geopandas
How can I copy raw files from an input dataset to an output dataset within a Python authoring transform?
This code defines a PySpark transform function to copy raw files from an input dataset to an output dataset. It uses the 'shutil' library to copy the file bytes and allows for copying all files or only a subset based on provided regex patterns.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
from transforms.api import transform, Input, Output from pyspark.sql import DataFrame from functools import reduce import shutil # Copy raw files from the input dataset to the output dataset in a python transform @transform( my_output=Output("my_output_dataset"), my_input=Input("my_input_dataset") ) def copy_my_input(my_output, my_input): copy_raw_files(my_output, my_input, [".*\.csv"], False) def copy_raw_files(my_output, my_input, regexes, copy_full=False): # Copies the raw files def copy_file(file_status): # Open the given file in the input dataframe filesystem with my_input.filesystem().open(file_status.path, 'rb') as in_f: # Open a file in the output dataframe filesystem with my_output.filesystem().open(file_status.path, 'wb') as out_f: # Copy the file bytes from intput to output shutil.copyfileobj(in_f, out_f) # Choose whether to copy all the files or only a subset if copy_full: files_df = my_input.filesystem().files() else: files_to_copy = [] for regex in regexes: # Only copy files that match the regex files_to_copy.append(my_input.filesystem().files(regex=regex)) # Create one dataframe with all the files files_df = reduce(DataFrame.unionByName, files_to_copy) # This will parallelise the copy operation files_df.rdd.foreach(copy_file)
code authoring
, code repositories
, python
How do I process multiple files in a dataset using PySpark?
This code uses PySpark to process multiple files in a dataset, including gzipped files, by reading the first line of each file and creating a dataframe with the file information and the first line content.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F from pyspark.sql import Row import gzip import io # @incremental decorator or not (compatible with both) # Changed @transform_df for @transform # This gives more control over inputs and outputs and is needed to access the "file" version of the input dataset(s) @transform( output_dataset_1=Output(""), output_dataset_2=Output(""), input_dataset=Input("") ) def example_transform_file_processing(ctx, input_dataset, output_dataset_1, output_dataset_2): # The "files()" method returns a dataframe representing the filesystem of the input dataset fs = input_dataset.filesystem() files_df = input_dataset.filesystem().files() # Here you can extract the paths of each file, and then process them in any way you need or want # ==== Example of computation # Defines the schema of the rdd flatmap output schema = T.StructType([ T.StructField('hadoop_path', T.StringType()), T.StructField('file_name', T.StringType()), T.StructField('size', T.LongType()), T.StructField('modified', T.LongType()), T.StructField('first_row_content', T.StringType()) ]) cols = schema.fieldNames() # equivalent to :["hadoop_path", "file_name", "size", "modified"] MyRow = Row(*cols) # defining of the "MyRow" object to use as a return type of the RDD UDF-like function # Inline function to parse one file (idea: like a UDF, but for RDD) def process_file(file_status): # Example of processing : read the first line of each file line = "default value" try: line = "WARNING: Not supported file type." if file_status.path.endswith('.gz'): # Handle Gzipped files with fs.open(file_status.path, "rb") as f: gz = gzip.GzipFile(fileobj=f) br = io.BufferedReader(gz) tw = io.TextIOWrapper(br) line = tw.readline() else: with fs.open(file_status.path, "r") as f: line = f.readline() except Exception as e: line = "ERROR: " + str(e) # It creates a row out of the RDD element yield MyRow(fs.hadoop_path, file_status.path, file_status.size, file_status.modified, line) # Convert the files dataframe to a RDD. See https://spark.apache.org/docs/latest/rdd-programming-guide.html rdd = files_df.rdd # Apply a function on each element of the RDD rdd = rdd.flatMap(process_file) # Convert the RDD into a dataframe, to be write it to output easily # Specifying the schema allows to handle empty rdd. output_df = ctx.spark_session.createDataFrame(rdd, schema) # Add a timestamp output_df = output_df.withColumn('processed_at', F.current_timestamp()) # ==== End Example of computation # Writes to the output the filesystem's dataframe representation of the input output_dataset_1.write_dataframe(files_df) # Write the processed dataframe to the output output_dataset_2.write_dataframe(output_df)
code authoring
, code repositories
, python
, gzip
, zip
How do I load an ORC file using PySpark?
This code reads a raw ORC file from a the Hadoop path of an input dataset and writes the resulting spark dataframe to an output.
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, Input, Output @transform( out=Output("output"), raw=Input("input"), ) def compute(ctx, out, raw): hadoop_path = raw.filesystem().hadoop_path df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/') out.write_dataframe(df)
pyspark
, dataframe
, orc
, hadoop
How do I create a PySpark dataframe from SAS datasets?
This code defines a transform function that takes an input dataset containing raw SAS files and creates a PySpark dataframe from them. It uses the spark-sas7bdat
package to read the SAS files and load them into a dataframe.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
@transform( output=Output("xxxxx"), # include foundry RID here input_df=Input("xxxxx") # include foundry RID here ) def parse_sas_file(ctx, input_df, output, sas_path="*.sas7bdat"): ''' Creates a PySpark dataframe from SAS datasets Note that this function performs computation in the driver, and may require an increase in driver memory ctx: Spark context input_df: Input dataset containing raw SAS files sas_path: Path to SAS file within dataset, defaults to all SAS files in the dataset include_filename_as_field: Include the filename as a column for parsing downstream; defaults to false ''' fs = input_df.filesystem() hadoop_path = fs.hadoop_path files_df = fs.files(sas_path) # dfs = [] spark_session = ctx.spark_session.builder.appName(ctx.spark_session.sparkContext.appName).config('spark.jars.packages', 'saurfang:spark-sas7bdat:3.0.0-s_2.12').getOrCreate() # TODO: Update this to work for multiple paths # read in whatever files from the backing dataset path = files_df.collect()[0].path full_path = f'{hadoop_path}/{path}' df = spark_session.read.format("com.github.saurfang.sas.spark").load(full_path) output.write_dataframe(df)
pyspark
, dataframe
, sas
, code repositories
How do I process multiple files in a dataset and combine them into a single PySpark DataFrame?
This code defines a PySpark transform that takes an input dataset containing multiple files, processes each file individually, and combines the results into a single PySpark DataFrame. It uses the 'map' function to apply the 'parse_file' function to each file in the dataset, collects the results, and unions all the DataFrames together.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import tempfile import shutil import functools # Define the PySpark transform @transform( processed_files=Output("example_processed_files_dataset"), file_dataset=Input("example_file_dataset"), ) def compute(ctx, processed_files, file_dataset): # Function to parse a single file def parse_file(file_status): with file_dataset.filesystem().open(file_status.path, "rb") as in_file: with tempfile.NamedTemporaryFile() as tmp_file: shutil.copyfileobj(in_file, tmp_file) tmp_file.flush() # Process the file locally and return a Python object return process_file_locally_and_return_python_object(tmp_file) # Get the list of files in the dataset files_df = file_dataset.filesystem().files() # Parse each file and collect the results parsed_files = files_df.rdd.map(parse_file).collect() dfs = [] for parsed_file in parsed_files: # Convert the parsed file to a PySpark DataFrame dfs.append(convert_to_df(ctx, parsed_file)) # Union all the DataFrames together df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs) # Write the resulting DataFrame to the output dataset processed_files.write_dataframe(df)
code authoring
, code repositories
, python
, raw files
, unstructured
How do I extract content from a DOCX file using Python?
This code uses the python-docx library to read the content of a DOCX file from a dataset and store it in a Document object for further processing.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
from transforms.api import transform, Input, Output import docx as dx from io import BytesIO @transform( output=Output("output_dataset"), docs=Input("input_dataset"), ) def compute(ctx, docs, output): fs = docs.filesystem() doc_file = list(fs.ls(regex=r'.*\.docx'))[0] # Open the file with the filesystem and read its content into a BytesIO object with fs.open(doc_file.path, 'rb') as f: source_stream = BytesIO(f.read()) document = dx.Document(source_stream) source_stream.close() # Do something with the document object
code authoring
, python
, python-docx
, bytesio
, raw files
, unstructured
How do I read and process a zip file containing multiple CSV files from an input dataset and write the processed data to an output dataset in PySpark?
This code uses PySpark to read and process zip files containing CSVs in an input dataset, skipping the first line of each CSV, and writes the processed data to an output dataset
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import shutil import tempfile import zipfile import io @transform( my_output=Output("my_output_dataset"), my_input=Input("my_input_dataset") ) def compute(ctx, my_output, my_input): # Function to process each file in the input dataset def process_file(file_status): with fs.open(file_status.path, 'rb') as f: with tempfile.NamedTemporaryFile() as tmp: shutil.copyfileobj(f, tmp) tmp.flush() # Read and process the zip file with zipfile.ZipFile(tmp) as archive: for filename in archive.namelist(): with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # Skip the first line of each CSV # Read and process each line in the CSV for line in tw: yield MyRow(*line.split(",")) # Read input dataset and process each file rdd = my_input.files().rdd rdd = rdd.flatMap(process_file) df = rdd.toDF() # Write the processed data to the output dataset my_output.write_dataframe(df)
code authoring
, code repositories
, python
, zip
, csv
How can I unzip a file in a dataset?
This code uses PySpark to read zipped files from an input, extract the contents, and write the extracted files to an output. It does this by iterating through the zipped files, reading their contents into a BytesIO stream, and then using the zipfile library to extract the files to a temporary directory. The extracted files are then written to the output.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
# from pyspark.sql import functions as F from transforms.api import transform, Input, Output import zipfile import tempfile import os from io import BytesIO @transform( unzipped=Output(""), zipped=Input(""), ) def compute(unzipped, zipped): zip_files = zipped.filesystem().files(glob="*.zip").collect() for zip_file in zip_files: with zipped.filesystem().open(zip_file["path"], 'rb') as zip_f: source_stream = BytesIO(zip_f.read()) with zipfile.ZipFile(source_stream, 'r') as zip_ref: with tempfile.TemporaryDirectory() as temp_dir: zip_ref.extractall(temp_dir) for path in iterate_directories(temp_dir): output_file_name = path.replace(temp_dir, "") with unzipped.filesystem().open(output_file_name, "w") as out_f: with open(path, 'r') as in_f: out_f.write(in_f.read()) def iterate_directories(directory): for root, dirs, files in os.walk(directory): for file in files: path = os.path.join(root, file) if is_leaf_file(path): yield path def is_leaf_file(path): return os.path.isfile(path) and not os.path.islink(path)
code authoring
, code repositories
, python
, raw files
, zip
, unzip
How can I create a zip file from a dataset of files?
This code uses the transforms API to read all Markdown files from a source dataset and create a zip file containing these files.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from transforms.api import transform, Input, Output import zipfile @transform( my_output=Output(""), source_df=Input(""), ) def compute(ctx, my_output, source_df): files = source_df.filesystem().files(glob="*.md").collect() with my_output.filesystem().open("foundry_code_examples.zip", 'wb') as write_zip: with zipfile.ZipFile(write_zip.name, 'w') as zip_file: for file_row in files: with source_df.filesystem().open(file_row["path"], 'rb') as markdown_file: zip_file.write(markdown_file.name, arcname=file_row["path"]) return source_df
raw files
, zip
, python
, code authoring
, code repositories
, export
How can I parse Excel files where the header is complex and consists of multiple rows?
This code demonstrates how to parse Excel files with complex headers using the transforms-excel-parser
library. It creates a TableParser with a MultilayerMergedHeaderExtractor, then creates a TransformsExcelParser with the TableParser. Finally, it uses the TransformsExcelParser to extract data from the Excel files in the input dataset and writes the result to the output.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
package myproject.datasets; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.table.MultilayerMergedHeaderExtractor; import com.palantir.transforms.excel.table.TableParser; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import java.util.Optional; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class ComplexHeaderExcel { @Compute public void myComputeFunction( @Input("<input_dataset_rid>") FoundryInput myInput, @Output("<output_dataset_rid>") FoundryOutput myOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput ) { // Create a TableParser with a MultilayerMergedHeaderExtractor Parser tableParser = TableParser.builder() .headerExtractor(MultilayerMergedHeaderExtractor.builder() .topLeftCellName("A1") .bottomRightCellName("D2") .build()) .build(); // Create a TransformsExcelParser with the TableParser TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser); // Parse input ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // Get the parsed data, which may be empty if there were no rows in the input // or an error occurred Optional<Dataset<Row>> maybeDf = result.singleResult(); // If parsed data is not empty, write it to the output dataset maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write()); // Write error information to the error output errorOutput.getDataFrameWriter(result.errorDataframe()).write(); } }
code authoring
, code repositories
, java
, transforms-excel-parser
, excel
How do I parse Excel files where the data is not tabular?
This code demonstrates how to use the transforms-excel-parser
library to extract data from Excel files containing forms across multiple sheets.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
package myproject.datasets; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.form.FieldSpec; import com.palantir.transforms.excel.form.FormParser; import com.palantir.transforms.excel.form.Location; import com.palantir.transforms.excel.form.cellvalue.AdjacentCellAssertion; import com.palantir.transforms.excel.form.cellvalue.CellValue; import com.palantir.transforms.excel.functions.RegexSubstringMatchingSheetSelector; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; public final class FormStyleExcel { private static final String FORM_A_KEY = "FORM_A"; private static final String FORM_B_KEY = "FORM_B"; @Compute public void myComputeFunction( @Input("<input_dataset_rid") FoundryInput myInput, @Output("<form_a_output_dataset_rid>") FoundryOutput formAOutput, @Output("<form_b_output_dataset_rid>") FoundryOutput formBOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput) { // Form A parser configuration Parser formAParser = FormParser.builder() .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_A")) .addFieldSpecs(createFieldSpec("form_a_field_1", "B1")) .addFieldSpecs(createFieldSpec("form_a_field_2", "B2")) .build(); // Form B parser configuration Parser formBParser = FormParser.builder() .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B")) .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B")) .addFieldSpecs(createFieldSpec("form_b_field_1", "B1")) .addFieldSpecs(createFieldSpec("form_b_field_2", "B2")) .build(); // TransformsExcelParser with both Form A and Form B parsers TransformsExcelParser transformsParser = TransformsExcelParser.builder() .putKeyToParser(FORM_A_KEY, formAParser) .putKeyToParser(FORM_B_KEY, formBParser) .build(); // Parse input ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // Write parsed data to the output datasets result.dataframeForKey(FORM_A_KEY) .ifPresent(df -> formAOutput.getDataFrameWriter(df).write()); result.dataframeForKey(FORM_B_KEY) .ifPresent(df -> formBOutput.getDataFrameWriter(df).write()); // Write error information to the error output errorOutput.getDataFrameWriter(result.errorDataframe()).write(); } // Helper method to concisely create a FieldSpec with an appropriate assertion private static FieldSpec createFieldSpec(String fieldName, String cellLocation) { return FieldSpec.of( fieldName, CellValue.builder() .addAssertions(AdjacentCellAssertion.left(1, fieldName)) .location(Location.of(cellLocation)) .build()); } }
code authoring
, code repositories
, java
, transforms-excel-parser
, excel
How do I parse simple tabular Excel files using Transforms Excel Parser?
This code demonstrates how to parse a dataset containing simple tabular Excel files using the transforms-excel-parser
library. It creates a TableParser with a SimpleHeaderExtractor, then creates a TransformsExcelParser with the TableParser. Finally, it uses the TransformsExcelParser to parse the files in the input dataset and writes the extracted data to the output dataset.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
package myproject.datasets; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.table.SimpleHeaderExtractor; import com.palantir.transforms.excel.table.TableParser; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import java.util.Optional; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class SimpleTabularExcel { @Compute public void myComputeFunction( @Input("<input_dataset_rid>") FoundryInput myInput, @Output("<output_dataset_rid>") FoundryOutput myOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput ) { // Create a TableParser with an appropriately configured SimpleHeaderExtractor // In this example, the header of the file is on the second row. // If the header were on the first row, we would not need to // specify rowsToSkip, since the default value is 0, and in fact // we could just do TableParser.builder().build() in that case. Parser tableParser = TableParser.builder() .headerExtractor( SimpleHeaderExtractor.builder().rowsToSkip(1).build()) .build(); // Create a TransformsExcelParser with the TableParser TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser); // Parse input ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // Get the parsed data, which may be empty if there were no rows in the input // or an error occurred Optional<Dataset<Row>> maybeDf = result.singleResult(); // If parsed data is not empty, write it to the output dataset maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write()); // Write error information to the error output errorOutput.getDataFrameWriter(result.errorDataframe()).write(); }
code authoring
, code repositories
, java
, transforms-excel-parser
, excel