Read and write unstructured files

Warning

Unstructured file access is an advanced topic. Ensure you are familiar with the rest of the content in this user guide before reading this page.

You may want to access files in a data transformation for a variety of reasons. File access is particularly useful if you want to process files in non-tabular formats (such as XML or JSON) or compressed formats (such as gz or zip).

The transforms Python library allows users to read and write files in Foundry datasets. transforms.api.TransformInput exposes a read-only FileSystem object while transforms.api.TransformOutput exposes a write-only FileSystem object. These FileSystem objects allow file access based on the path of a file within the Foundry dataset, abstracting away the underlying storage.

If you want to have access to files in your data transformation, you must construct your Transform object using the transform() decorator. This is because FileSystem objects are exposed by TransformInput and TransformOutput objects. transform() is the only decorator that expects the input(s) and output(s) to its compute function to be of type TransformInput and TransformOutput, respectively.

Importing files

Files can be uploaded into Foundry using manual file imports or synced via a data connection. Structured and unstructured files can be imported into Foundry datasets to be processed in downstream applications. Files can also be uploaded as a raw file without modifying the extension. The examples below refer to files uploaded as Foundry datasets, rather than as raw files.

Foundry also has functionality to automatically infer a schema for certain file types uploaded to a dataset. For example, when importing a file of CSV type, the Apply a schema button is available to automatically apply a schema. Learn more about manually uploading data.

Browsing files

Files in a dataset can be listed using the transforms.api.FileSystem.ls() method. This method returns a generator of transforms.api.FileStatus objects. These objects capture the path, size (in bytes), and modified timestamp (milliseconds since Unix epoch) for each file. Consider the following Transform object:

Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code pass

In your data transformation code, you can browse your dataset files:

Copied!
1 2 list(hair_eye_color.filesystem().ls()) # Result: [FileStatus(path='students.csv', size=688, modified=...)]

It is also possible to filter the results of the ls() call by passing either a glob or a regex pattern:

Copied!
1 2 3 4 5 list(hair_eye_color.filesystem().ls(glob='*.csv')) # Result: [FileStatus(path='students.csv', size=688, modified=...)] list(hair_eye_color.filesystem().ls(regex='[A-Z]*\.csv')) # Result: []

Reading files

Files can be opened using the transforms.api.FileSystem.open() method. This returns a Python file-like stream object. All options accepted by io.open() are also supported. Note that files are read as streams meaning that random access is not supported.

The file-like stream object returned by the open() method does not support the seek or tell methods. Thus, random access is not supported.

Consider the following Transform object:

Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code pass

In your data transformation code, you can read your dataset files:

Copied!
1 2 3 4 with hair_eye_color.filesystem().open('students.csv') as f: f.readline() # Result: 'id,hair,eye,sex\n'

The stream can also be passed into parsing libraries. For instance, we can parse a CSV file.

Copied!
1 2 3 4 5 6 import csv with hair_eye_color.filesystem().open('students.csv') as f: reader = csv.reader(f, delimiter=',') next(reader) # Result: ['id', 'hair', 'eye', 'sex']

As mentioned earlier, you could also process files in non-tabular formats (such as XML or JSON) or compressed formats (such as gz or zip). For instance, you can read the CSVs inside of a zipped file and return their contents as a dataframe with the code below:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def process_file(file_status): with fs.open(file_status.path, 'rb') as f: with tempfile.NamedTemporaryFile() as tmp: shutil.copyfileobj(f, tmp) tmp.flush() with zipfile.ZipFile(tmp) as archive: for filename in archive.namelist(): with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # Skip the first line of each CSV for line in tw: yield MyRow(*line.split(",")) rdd = fs.files().rdd rdd = rdd.flatMap(process_file) df = rdd.toDF()

Random access

Warning

Using random access leads to significant performance degradation. We recommend rewriting your code so that it does not rely on the seek method. If you still want to use random access, refer below for information on how to do so.

Since the open() method returns stream objects, random access is not supported. If you need to have random access, you can buffer the file into memory or onto disk. Assuming hair_eye_color corresponds to a TransformInput object, here are some examples:

Copied!
1 2 3 4 5 6 7 8 import io import shutil s = io.StringIO() with hair_eye_color.filesystem().open('students.csv') as f: shutil.copyfileobj(f, s) s.getvalue() # Result: 'id,hair,eye,sex\n...'
Copied!
1 2 3 4 5 with hair_eye_color.filesystem().open('students.csv') as f: lines = f.read().splitlines() lines[0] # Result: 'id,hair,eye,sex'
Copied!
1 2 3 4 5 6 7 8 9 import tempfile with tempfile.NamedTemporaryFile() as tmp: with hair_eye_color.filesystem().open('students.csv', 'rb') as f: shutil.copyfileobj(f, tmp) tmp.flush() # shutil.copyfileobj does not flush with open(tmp.name) as t: t.readline() # Result: 'id,hair,eye,sex\n'

Writing files

Files are written in a similar way using the open() method. This returns a Python file-like stream object that can only be written to. All keyword arguments accepted by io.open() are also supported. Note that files are written as streams meaning that random access it not supported. Consider the following Transform object:

Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code pass

In your data transformation code, it is possible to write to an output filesystem. In the following example, you persist a model using the pickle module, the built-in serializer for Python:

Copied!
1 2 3 4 import pickle with processed.filesystem().open('model.pickle', 'wb') as f: pickle.dump(model, f)

Distributed processing

Unlike data transformations expressed in terms of DataFrame objects, it’s important to understand the difference between driver and executor code with file-based transformations. The compute function is executed on the driver (a single machine) with Spark automatically distributing DataFrame functions to the executors (many machines) as it sees fit.

To benefit from distributed processing with the files API, we have to leverage Spark to distribute the computation. To do so, we create a DataFrame of FileStatus and distribute this across our executors. Each task on the executor can then open the file that it has been assigned and process it with the results being aggregated by Spark.

The files API exposes the files() function that accepts the same arguments as the ls() function but, instead, returns a DataFrame of FileStatus objects. This DataFrame is partitioned by file size to help balance the computation when file sizes vary. The partitioning can be controlled using two Spark configuration options:

  • spark.sql.files.maxPartitionBytes ↗ is the maximum number of bytes to pack into a single partition when reading files.

  • spark.sql.files.openCostInBytes ↗ is the estimated cost to open a file, measured by the number of bytes that could be scanned in the same time. This is added to the file size to calculate the total number of bytes used by the file in the partition.

To modify the values for these properties, you must create a custom Transforms profile and apply it to your Transform using the configure() decorator. For more information, refer to the section on defining Transforms profiles in the Code Repositories documentation. Now, let’s step through an example. Say we have CSV files that we want to parse and concatenate. We make use of flatMap() to apply a processing function to each FileStatus object. This processing function must yield rows according to pyspark.sql.SparkSession.createDataFrame().

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import csv from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType from transforms.api import transform, Input, Output @transform( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(hair_eye_color, processed): def process_file(file_status): with hair_eye_color.filesystem().open(file_status.path) as f: r = csv.reader(f) # Construct a pyspark.Row from our header row header = next(r) MyRow = Row(*header) for row in r: yield MyRow(*row) schema = StructType([ StructField('student_id', StringType(), True), StructField('hair_color', StringType(), True), StructField('eye_color', StringType(), True), ]) files_df = hair_eye_color.filesystem().files('**/*.csv') processed_df = files_df.rdd.flatMap(process_file).toDF(schema) processed.write_dataframe(processed_df)
Warning

Although it is possible to call toDF() without passing a schema, if your file processing returns zero rows then Spark’s schema inference will fail throwing a ValueError: RDD is empty exception. We therefore recommend you always manually specify a schema.