データ統合Java基礎的なトランスフォーム非構造化ファイルの読み書き

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

非構造化ファイルの読み書き

ここでの例はより高度な内容を含んでいます。このセクションを読む前に、Transforms の定義に関するセクションを確認してください。

このページには、Transforms Java を使用したさまざまなデータ変換の例が含まれています。

ここでの例は、ファイルの観点から表現されたデータ変換です。変換中にファイルにアクセスしたい場合、低レベルのTransformを定義する必要があります。これは、基礎となるデータセットファイルがFoundryInputおよびFoundryOutputオブジェクトによって公開されるからです。低レベルの Transforms は、高レベルのものとは異なり、計算機能への入力と出力をFoundryInputおよびFoundryOutputのタイプであることを期待しています。含まれている例も、手動登録を目的とした低レベルの Transforms です。

Spark を使った並列処理

データセットファイルの解凍と出力ファイルシステムへの書き込み

この例では、入力として.zipファイルを受け取ります。ファイルを解凍し、そのファイルを出力ファイルシステムに書き込みます - .zipは分割可能ではないため、この作業は Spark を使用して.zipファイルごとに並列に行われます。単一の圧縮ファイル内で解凍を並列化したい場合は、.bz2のような分割可能なファイル形式を使用してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 /* * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. */ package com.palantir.transforms.java.examples; import com.google.common.io.ByteStreams; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem; import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem; import com.palantir.util.syntacticpath.Paths; import java.io.IOException; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; /** * これは、Sparkを使用してファイルを並行して解凍する例です。 * <p> * 作業はエグゼキュータに分散されます。 */ public final class UnzipWithSpark { @Compute public void compute(FoundryInput zipFiles, FoundryOutput output) throws IOException { ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem(); WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem(); inputFileSystem.filesAsDataset().foreach(portableFile -> { // "processWith" は、指定された入力ファイルの InputStream を提供します。 portableFile.processWithThenClose(stream -> { try (ZipInputStream zis = new ZipInputStream(stream)) { ZipEntry entry; // .zipファイル内の各ファイルを出力ファイルシステムに書き込みます。 while ((entry = zis.getNextEntry()) != null) { outputFileSystem.writeTo( Paths.get(entry.getName()), outputStream -> ByteStreams.copy(zis, outputStream)); } return null; } catch (IOException e) { throw new RuntimeException(e); } }); }); } }

データセットファイルの解凍と出力DataFrameへの書き込み

この例では、入力として.csv.gz、および.zipファイルを取り込みます。ファイルを解凍し、そのファイルを出力DataFrameに書き込む作業がSparkを使用して並行して行われます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 /* * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. */ package com.palantir.transforms.java.examples; import com.google.common.collect.AbstractIterator; import com.google.common.io.CharSource; import com.google.common.io.Closeables; import com.palantir.spark.binarystream.data.PortableFile; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.zip.GZIPInputStream; import java.util.zip.ZipInputStream; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.util.TaskCompletionListener; /** * これは、入力として.csv/.gz/.zipファイルを想定している例です。 * <p> * Spark上で並列に次の操作を行います: * <ol> * <li>ファイルのタイプを検出します。</li> * <li>ファイルタイプが.gzまたは.zipの場合、ファイルを解凍します。</li> * <li>解凍した.csvファイルのスキーマを推測します。</li> * <li>.csv行のデータセットをそのスキーマのデータセットに変換します。</li> * </ol> */ public final class UnzipWithSparkToDataset { @Compute public void compute(FoundryInput input, FoundryOutput output) { // 入力ファイルのSparkデータセットを取得します。 Dataset<PortableFile> files = input.asFiles().getFileSystem().filesAsDataset(); // 入力ファイルのデータセットを.csv行のデータセットに変換します。 Dataset<String> csvDataset = files.flatMap((FlatMapFunction<PortableFile, String>) portableFile -> // 現在の入力ファイルからInputStreamを取得します。 portableFile.convertToIterator(inputStream -> { String fileName = portableFile.getLogicalPath().getFileName().toString(); // .gzと.zipファイルを検出し、各行からイテレータを取得します。 if (fileName.endsWith(".gz")) { return new InputStreamCharSource(new GZIPInputStream(inputStream)).getLineIterator(); } else if (fileName.endsWith(".zip")) { return new ZipIterator(new ZipInputStream(inputStream)); } else { return new InputStreamCharSource(inputStream).getLineIterator(); } }), Encoders.STRING()); // スキーマを推測し、.csv行のデータセットをそのスキーマのデータセットに変換します。 Dataset<Row> dataset = files .sparkSession() .read() .option("inferSchema", "true") .csv(csvDataset); output.getDataFrameWriter(dataset).write(); } /* * このZipIteratorは、アーカイブ内のすべてのファイルが同じスキーマを持つ.csvであり、 * 同じデータセットに属していると仮定します。 */ private static final class ZipIterator extends AbstractIterator<String> { private Iterator<String> lineIterator; private ZipInputStream zis; ZipIterator(ZipInputStream zis) throws IOException { this.zis = zis; lineIterator = new InputStreamCharSource(zis).getLineIterator(); } @Override protected String computeNext() { if (!lineIterator.hasNext()) { // 行イテレータに次の要素がない場合、次のファイルがあるかどうかを確認します。 try { // 次の非空のファイルを見つけます。 while (zis.getNextEntry() != null) { lineIterator = new InputStreamCharSource(zis).getLineIterator(); if (lineIterator.hasNext()) { break; } } return lineIterator.hasNext() ? lineIterator.next() : endOfData(); } catch (IOException e) { throw new RuntimeException(e); } } else { return lineIterator.next(); } } } private static final class InputStreamCharSource extends CharSource { private final Reader inputStream; private InputStreamCharSource(InputStream inputStream) { this.inputStream = new InputStreamReader(inputStream, StandardCharsets.UTF_8); } @Override public Reader openStream() throws IOException { return inputStream; } @SuppressWarnings("MustBeClosedChecker") Iterator<String> getLineIterator() { try { return super.lines().iterator(); } catch (IOException e) { throw new RuntimeException(e); } finally { if (TaskContext.get() != null) { // Sparkで実行している場合、タスクが終了したらストリームを閉じます。 TaskContext.get().addTaskCompletionListener((TaskCompletionListener) context -> close()); } else { close(); } } } private void close() { Closeables.closeQuietly(inputStream); } } }

データセットファイルの結合

この例では、入力として.zipファイルを取り、すべての入力データセットファイルを単一の.zipファイルに結合します。このトランスフォームでの計算は並列化されていないことに注意してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 /* * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. */ package com.palantir.transforms.java.examples; import com.google.common.io.ByteStreams; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem; import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem; import com.palantir.util.syntacticpath.Paths; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; /** * これは、データセット内のすべてのファイルを一つの大きな.zipファイルに結合する例です。 * <p> * 作業は単一のスレッドで行われます、なぜなら並行して実行するのは難しいからです。 * <p> * 警告: 一般的には、Sparkの利点を活かすために {@link UnzipWithSpark} および {@link UnzipWithSparkToDataset} * のAPIを使用することが推奨されます。これは意味のある並列化が難しい計算の例であるため、ドライバー上でのみファイルシステム操作を使用して行われます。 */ public final class ZipOnDriver { @Compute public void compute(FoundryInput zipFiles, FoundryOutput output) { ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem(); WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem(); // 出力データセットのファイルシステム内の "bigzip.zip" というファイルに書き込みます。 outputFileSystem.writeTo(Paths.get("bigzip.zip"), outputStream -> { // OutputStreamをZipOutputStreamにラップして、各ファイルを同じ.zipファイルに書き込む。 try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) { // 入力データセットのFileSystemの各ファイルを読み込み、"bigzip.zip"の新しいエントリをマークし、 // バイトをコピーします。 inputFileSystem.listAllFiles().forEach(inputPath -> { inputFileSystem.readFrom(inputPath, inputStream -> { zipOutputStream.putNextEntry(new ZipEntry(inputPath.toString())); ByteStreams.copy(inputStream, zipOutputStream); return null; }); }); } }); } }