注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
以下は、トランスフォームについてよくある質問です。
一般的な情報については、トランスフォームのドキュメンテーションをご覧ください。
transforms-python
で Parquet を保存する代わりに CSV ファイルを保存することは可能ですか?transforms-python
で Parquet を保存する代わりに CSV ファイルを保存することは可能ですか?以下は、各トランスフォーム言語でこれを行う方法の例です:
Java
Copied!1 2 3 4 5 6
// foundryOutputのgetDataFrameWriterメソッドを使用してdataFrameからデータを取得します foundryOutput.getDataFrameWriter(dataFrame) // setFormatSettingsメソッドを使用して、データセットのフォーマットを設定します。ここでは"csv"を設定しています .setFormatSettings(DatasetFormatSettings.builder().format("csv").build()) // writeメソッドを使用して、設定したフォーマットでデータを書き出します .write();
Python
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# 日本語のコメントを追加 from transforms.api import transform, Input, Output # transformデコレータを使用して、入力と出力を指定 @transform( output=Output("/path/to/python_csv"), # 出力先のパス my_input=Input("/path/to/input") # 入力ファイルのパス ) # 計算処理を行う関数 def my_compute_function(output, my_input): # 入力データをデータフレーム形式で読み込み、csv形式で出力 output.write_dataframe(my_input.dataframe(), output_format="csv")
SQL
Copied!1 2
-- テーブルを作成する CREATE TABLE `/path/to/sql_csv` USING CSV AS SELECT * FROM `/path/to/input`
複数のトランスフォーム/データセットが必要な場合、for
ループを使用して作成できます:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
from transforms.api import transforms_df, Input, Output def transform_generator(sources): #type: (List[str]) -> List([transforms.api.Transform]) transforms = [] # この例では複数の入力データセットが使用されています。一つの入力データセットから複数の出力を生成することもできます。 for source in sources: @transforms_df( Output('/sources/{source}/output'.format(source=source)), my_input=Input('/sources/{source}/input'.format(source=source)) ) def compute_function(my_input, source=source): # 関数内でsource変数をキャプチャするには、デフォルトのキーワード引数として渡します。 return my_input.filter(my_input.source == source) transforms.append(compute_function) return transforms # 'src1', 'src2', 'src3' というソースから変換を生成します。 TRANSFORMS = transforms_generator(['src1', 'src2', 'src3'])
モジュールの TRANSFORMS
属性をインポートして、各トランスフォームをユーザーのパイプラインに手動で追加することができます:
Copied!1 2 3 4 5 6 7
# my_moduleをインポートします import my_module # Pipelineインスタンスを作成します my_pipeline = Pipeline() # my_moduleから変換を追加します my_pipeline.add_transforms(*my_module.TRANSFORMS)
1つのトランスフォームで、1つの入力を受け取り、同じビルド内で複数のデータセットを出力するには、以下のようにプログラムで実行することもできます:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
# `/examples/students_hair_eye_color`データセットを使用 students_input = foundry.input('/examples/students_hair_eye_color') students_input.dataframe().sort('id').show(n=3) +---+-----+-----+----+ | id| hair| eye| sex| +---+-----+-----+----+ | 1|Black|Brown|Male| | 2|Brown|Brown|Male| | 3| Red|Brown|Male| +---+-----+-----+----+ # この例では上位3行のみを表示しています。 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # 入力データセットを指定 males=Output('/examples/hair_eye_color_males'), # 男性のデータを出力するパスを指定 females=Output('examples/hair_eye_color_females'), # 女性のデータを出力するパスを指定 ) def brown_hair_by_sex(hair_eye_color, males, females): # type: (TransformInput, TransformOutput, TransformOutput) -> None # ブラウンヘアのデータフレームをフィルタリング brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') # 性別でデータフレームをフィルタリングし、それぞれの出力パスに書き込む males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male')) # 男性のデータを書き込む females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female')) # 女性のデータを書き込む
トランスフォームに関する詳しいヘルプや情報は、以下のドキュメンテーションをご確認ください:
トランスフォームへの入力は、ストリームによってバックアップされたファイルのようなオブジェクトであるため、それをファイルとして処理できます。これは、メモリ全体にファイルを読み込んだり、ディスクにコピーしたりすることを心配する必要がないことを意味し、より大きなファイルの使用を可能にします。
Python 3 に含まれる gzip
および io
パッケージを使用します:
Copied!1 2 3 4 5 6 7 8 9 10 11
import gzip, io def process_file(file_stauts): # 入力データセットのファイルシステムを取得 fs = input_dataset.filesystem() # ファイルをバイナリモードで開く with fs.open(file_status.path, 'rb') as f: # gzipファイルを解凍 gz = gzip.GzipFile(fileobj=f) # バッファリングされたリーダーを作成 br = io.BufferedReader(gz)
そして、読み取りが文字列を返すようにしたい場合は、それをラップできます:
Copied!1 2
# io.TextIOWrapperを使用して、バイナリーファイルをテキストファイルとして操作します。 tw = io.TextIOWrapper(br)
ファイルにエンコーディングがある場合、それを指定できます:
Copied!1 2 3
# 以下のコードは、バイナリデータをテキストデータに変換するために、 # エンコーディング 'CP500' を使用して、io.TextIOWrapper オブジェクトを作成します。 tw = io.TextIOWrapper(br, encoding='CP500')
トランスフォームの詳しい情報やヘルプについては、以下のドキュメントをご覧ください。
Java と Spark を使用して、ZIPアーカイブ内の各ファイルを並列化された方法で解凍します。 1つの圧縮ファイル内の解凍を並列化したい場合は、.bz2
のような分割可能なファイル形式を使用してください。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
package com.palantir.transforms.java.examples; import com.google.common.io.ByteStreams; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem; import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem; import com.palantir.util.syntacticpath.Paths; import java.io.IOException; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; /** * これは、Sparkを使用してファイルを並行して解凍する例です。 * <p> * 作業はエグゼキュータに分散されます。 */ public final class UnzipWithSpark { @Compute public void compute(FoundryInput zipFiles, FoundryOutput output) throws IOException { ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem(); WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem(); inputFileSystem.filesAsDataset().foreach(portableFile -> { // "processWith"は、指定された入力ファイルのInputStreamを提供します。 portableFile.processWithThenClose(stream -> { try (ZipInputStream zis = new ZipInputStream(stream)) { ZipEntry entry; // zipファイル内の各ファイルを出力ファイルシステムに書き込みます。 while ((entry = zis.getNextEntry()) != null) { outputFileSystem.writeTo( Paths.get(entry.getName()), outputStream -> ByteStreams.copy(zis, outputStream)); } return null; } catch (IOException e) { throw new RuntimeException(e); } }); }); } }
変換に関するより詳しいヘルプや情報については、以下のドキュメンテーションを参照してください: