Read and write unstructured files

The examples here contain more advanced content. Make sure to go through the section on defining Transforms before going through this section.

This page contains various example data transformations using Transforms Java:

The examples here are data transformations expressed in terms of files. If you want to have access to files in your transformation, you must define a low-level Transform. This is because underlying dataset files are exposed by FoundryInput and FoundryOutput objects. Low-level Transforms, unlike high-level ones, expect the input(s) and output(s) to the compute function to be of type FoundryInput and FoundryOutput, respectively. The included examples are also low-level Transforms intended for manual registration.

Parallel processing with Spark

Uncompressing dataset files & writing to an output filesystem

This example takes in .zip files as input. It unzips the files and then writes the files to the output FileSystem — because .zip is not splittable, this work is done in parallel per .zip file using Spark. If you want to parallelize decompression within a single compressed file, use a splittable file format like .bz2.

Copied!
1/* 2 * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. 3 */ 4package com.palantir.transforms.java.examples; 5 6import com.google.common.io.ByteStreams; 7import com.palantir.transforms.lang.java.api.Compute; 8import com.palantir.transforms.lang.java.api.FoundryInput; 9import com.palantir.transforms.lang.java.api.FoundryOutput; 10import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem; 11import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem; 12import com.palantir.util.syntacticpath.Paths; 13import java.io.IOException; 14import java.util.zip.ZipEntry; 15import java.util.zip.ZipInputStream; 16 17/** 18 * This is an example of unzipping files in parallel using Spark. 19 * <p> 20 * The work is distributed to executors. 21 */ 22public final class UnzipWithSpark { 23 24 @Compute 25 public void compute(FoundryInput zipFiles, FoundryOutput output) throws IOException { 26 ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem(); 27 WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem(); 28 29 inputFileSystem.filesAsDataset().foreach(portableFile -> { 30 // "processWith" gives you the InputStream for the given input file. 31 portableFile.processWithThenClose(stream -> { 32 try (ZipInputStream zis = new ZipInputStream(stream)) { 33 ZipEntry entry; 34 // For each file in the .zip file, write it to the output file system. 35 while ((entry = zis.getNextEntry()) != null) { 36 outputFileSystem.writeTo( 37 Paths.get(entry.getName()), 38 outputStream -> ByteStreams.copy(zis, outputStream)); 39 } 40 return null; 41 } catch (IOException e) { 42 throw new RuntimeException(e); 43 } 44 }); 45 }); 46 } 47}

Uncompressing dataset files & writing to an output DataFrame

This example takes in .csv, .gz, and .zip files as input. It uncompresses the files and then writes the files to the output DataFrame—this work is done in parallel using Spark.

Copied!
1/* 2 * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. 3 */ 4 5package com.palantir.transforms.java.examples; 6 7import com.google.common.collect.AbstractIterator; 8import com.google.common.io.CharSource; 9import com.google.common.io.Closeables; 10import com.palantir.spark.binarystream.data.PortableFile; 11import com.palantir.transforms.lang.java.api.Compute; 12import com.palantir.transforms.lang.java.api.FoundryInput; 13import com.palantir.transforms.lang.java.api.FoundryOutput; 14import java.io.IOException; 15import java.io.InputStream; 16import java.io.InputStreamReader; 17import java.io.Reader; 18import java.nio.charset.StandardCharsets; 19import java.util.Iterator; 20import java.util.zip.GZIPInputStream; 21import java.util.zip.ZipInputStream; 22import org.apache.spark.TaskContext; 23import org.apache.spark.api.java.function.FlatMapFunction; 24import org.apache.spark.sql.Dataset; 25import org.apache.spark.sql.Encoders; 26import org.apache.spark.sql.Row; 27import org.apache.spark.util.TaskCompletionListener; 28 29/** 30 * This is an example expects .csv/.gz/.zip files as input. 31 * <p> 32 * It does the following operations in parallel over Spark: 33 * <ol> 34 * <li>Detects the type of the file.</li> 35 * <li>If the file type is .gz or .zip, it uncompressed the files.</li> 36 * <li>Infers the schema of the uncompressed .csv files.</li> 37 * <li>Converts the dataset of .csv lines into a dataset of that schema.</li> 38 * </ol> 39*/ 40public final class UnzipWithSparkToDataset { 41 42 @Compute 43 public void compute(FoundryInput input, FoundryOutput output) { 44 // Get a Spark dataset of input files. 45 Dataset<PortableFile> files = input.asFiles().getFileSystem().filesAsDataset(); 46 47 // Convert the dataset of input files to a dataset of .csv lines. 48 Dataset<String> csvDataset = files.flatMap((FlatMapFunction<PortableFile, String>) portableFile -> 49 // Get an InputStream from the current input file. 50 portableFile.convertToIterator(inputStream -> { 51 String fileName = portableFile.getLogicalPath().getFileName().toString(); 52 // Detect .gz and .zip files and get a line iterator from each. 53 if (fileName.endsWith(".gz")) { 54 return new InputStreamCharSource(new GZIPInputStream(inputStream)).getLineIterator(); 55 } else if (fileName.endsWith(".zip")) { 56 return new ZipIterator(new ZipInputStream(inputStream)); 57 } else { 58 return new InputStreamCharSource(inputStream).getLineIterator(); 59 } 60 }), Encoders.STRING()); 61 62 // Infers the schema and converts the dataset of .csv lines into a dataset of that schema. 63 Dataset<Row> dataset = files 64 .sparkSession() 65 .read() 66 .option("inferSchema", "true") 67 .csv(csvDataset); 68 output.getDataFrameWriter(dataset).write(); 69 } 70 71 /* 72 * This ZipIterator assumes that all files within the archive are .csvs with the 73 * same schema and belong to the same dataset. 74 */ 75 private static final class ZipIterator extends AbstractIterator<String> { 76 private Iterator<String> lineIterator; 77 private ZipInputStream zis; 78 79 ZipIterator(ZipInputStream zis) throws IOException { 80 this.zis = zis; 81 lineIterator = new InputStreamCharSource(zis).getLineIterator(); 82 } 83 84 @Override 85 protected String computeNext() { 86 if (!lineIterator.hasNext()) { 87 // If the line iterator does not have a next element, check if there is a next file. 88 try { 89 // Find the next file that is non empty. 90 while (zis.getNextEntry() != null) { 91 lineIterator = new InputStreamCharSource(zis).getLineIterator(); 92 if (lineIterator.hasNext()) { 93 break; 94 } 95 } 96 return lineIterator.hasNext() ? lineIterator.next() : endOfData(); 97 } catch (IOException e) { 98 throw new RuntimeException(e); 99 } 100 } else { 101 return lineIterator.next(); 102 } 103 } 104 } 105 106 private static final class InputStreamCharSource extends CharSource { 107 private final Reader inputStream; 108 109 private InputStreamCharSource(InputStream inputStream) { 110 this.inputStream = new InputStreamReader(inputStream, StandardCharsets.UTF_8); 111 } 112 113 @Override 114 public Reader openStream() throws IOException { 115 return inputStream; 116 } 117 118 @SuppressWarnings("MustBeClosedChecker") 119 Iterator<String> getLineIterator() { 120 try { 121 return super.lines().iterator(); 122 } catch (IOException e) { 123 throw new RuntimeException(e); 124 } finally { 125 if (TaskContext.get() != null) { 126 // If running in Spark, close the stream when the task is finished. 127 TaskContext.get().addTaskCompletionListener((TaskCompletionListener) context -> close()); 128 } else { 129 close(); 130 } 131 } 132 } 133 134 private void close() { 135 Closeables.closeQuietly(inputStream); 136 } 137 } 138}

Combining dataset files

This example takes in .zip files as input, and it combines all of the input dataset files into a single .zip file. Note that the computation in this Transform is not parallelized.

Copied!
1/* 2 * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. 3 */ 4package com.palantir.transforms.java.examples; 5 6import com.google.common.io.ByteStreams; 7import com.palantir.transforms.lang.java.api.Compute; 8import com.palantir.transforms.lang.java.api.FoundryInput; 9import com.palantir.transforms.lang.java.api.FoundryOutput; 10import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem; 11import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem; 12import com.palantir.util.syntacticpath.Paths; 13import java.util.zip.ZipEntry; 14import java.util.zip.ZipOutputStream; 15 16/** 17 * This is an example of combining all files in a dataset into one big .zip file. 18 * <p> 19 * The work is done on a single thread, because it is difficult to run in parallel. 20 * <p> 21 * WARNING: In general, it's preferred to use the APIs in {@link UnzipWithSpark} and {@link UnzipWithSparkToDataset} 22 * to take advantage of Spark. This is an example of a computation that is difficult to meaningfully parallelize, 23 * which is why it is done using file system operations only on the driver. 24 */ 25public final class ZipOnDriver { 26 27 @Compute 28 public void compute(FoundryInput zipFiles, FoundryOutput output) { 29 ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem(); 30 WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem(); 31 32 // Write to a file called "bigzip.zip" in the output dataset's file system. 33 outputFileSystem.writeTo(Paths.get("bigzip.zip"), outputStream -> { 34 // Wrap the OutputStream in a ZipOutputStream in order to write each file into the same .zip file. 35 try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) { 36 // For each file in the input dataset's FileSystem, read it, mark a new entry in the 37 // "bigzip.zip", then copy the bytes over. 38 inputFileSystem.listAllFiles().forEach(inputPath -> { 39 inputFileSystem.readFrom(inputPath, inputStream -> { 40 zipOutputStream.putNextEntry(new ZipEntry(inputPath.toString())); 41 ByteStreams.copy(inputStream, zipOutputStream); 42 return null; 43 }); 44 }); 45 } 46 }); 47 } 48}