Lightweight transforms can process dataset files without schemas. To process dataset files without a schema, list the files with my_input.filesystem().ls().
The statement .filesystem().ls() is available for datasets without schemas, but the .path(), .pandas(), .polars(), .arrow(), and .filesystem().files() statements are only available on datasets with schemas.
The following code shows an example lightweight transform that processes files of a dataset without a schema.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23from transforms.api import incremental, Input, Output, transform import polars as pl from concurrent.futures import ThreadPoolExecutor @incremental() @transform.using(my_input=Input("my-input"), my_output=Output('my-output')) def my_incremental_transform(my_input, my_output): fs = my_input.filesystem() files = list(fs.ls(glob="*.csv")) def process_file(dataset_file): file_path = dataset_file.path # Access the file with fs.open(file_path, "rb") as f: # <do something with the file> # return some data as a dataframe with ThreadPoolExecutor() as executor: polars_dataframes = list(executor.map(process_file, files)) # Union all the DFs into one combined_df = pl.concat(polars_dataframes) out.write_table(combined_df)
The following example demonstrates how to parse Excel files:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46from transforms.api import transform, Input, Output import tempfile import shutil import polars as pl import pandas as pd from concurrent.futures import ThreadPoolExecutor @transform.spark.using( my_output=Output("/path/tabular_output_dataset"), my_input=Input("/path/input_dataset_without_schema"), ) def compute(my_input, my_output): # Parse each file # Open the Excel file at the provided path, using the provided filesystem def read_excel_to_polars(fs, file_path): with fs.open(file_path, "rb") as f: with tempfile.TemporaryFile() as tmp: # Copy paste the file from the source dataset to the local filesystem shutil.copyfileobj(f, tmp) tmp.flush() # shutil.copyfileobj does not flush # read the excel file (the file is now seekable) pandas_df = pd.read_excel(tmp) # Convert eventual integer columns to string columns pandas_df = pandas_df.astype(str) # Convert the pandas dataframe to a polars dataframe return pl.from_pandas(pandas_df) fs = my_input.filesystem() # List all files in the input dataset files = [f.path for f in fs.ls()] def process_file(curr_file_as_row): # print(curr_file_as_row) return read_excel_to_polars(fs, curr_file_as_row) def union_polars_dataframes(dfs): return pl.concat(dfs) # Union all the DFs into one with ThreadPoolExecutor() as executor: polars_dataframes = list(executor.map(process_file, files)) combined_df = union_polars_dataframes(polars_dataframes) my_output.write_table(combined_df)