データ統合Java基礎的なトランスフォーム

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

高レベル変換

Javaでのデータ変換では、DataFrameオブジェクトを読み込み、処理し、書き込むことが一般的です。Java APIでは、DataFrameDataset<Row>によって表現されることを思い出してください。データ変換がDataFrameオブジェクトに依存する場合、高レベルのTransformを定義できます。高レベルのTransformは、Dataset<Row>タイプの入力を受け入れ、計算関数がDataset<Row>タイプの単一の出力を返すことを期待します。または、より一般的な低レベルのTransformを定義し、入力データセットを含むDataset<Row>にアクセスするために明示的にasDataFrame()メソッドを呼び出すこともできます。

高レベルのTransformを定義するには、任意の数のDataset<Row>タイプの入力を受け取り、Dataset<Row>タイプの単一の出力を返す計算関数を定義します。

自動登録

以下は、myproject.datasetsパッケージ内にHighLevelAutoTransformというクラスを作成してTransformを定義する方法の例です:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは自動登録用の高レベルのTransformの例です。 */ public final class HighLevelAutoTransform { // 自動登録されるTransformのクラスは、compute関数と入力/出力データセットに関する情報を含みます。 // 自動登録には "@Input" と "@Output" のアノテーションが必要です。 @Compute @Output("/path/to/output/dataset") public Dataset<Row> myComputeFunction(@Input("/path/to/input/dataset") Dataset<Row> myInput) { // 高レベルのTransformの計算関数は、"Dataset<Row>" 型の出力を返します。 return myInput.limit(10); } }

ハイレベル・トランスフォームは複数の入力と1つの出力をサポートしています。したがって、各入力パラメーターは、@Input(入力データセットへのフルパスを含む)で注釈付けする必要があり、計算関数は@Output(出力データセットへのフルパスを含む)で注釈付けする必要があります。

これで、このTransformをプロジェクトのPipelineに追加できるようになりました。PipelineDefinerの実装でPipeline.autoBindFromPackage()メソッドを呼び出すことで追加できます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // ここでJavaパッケージを提供して、自動登録したいTransformsを含めます。 pipeline.autoBindFromPackage("myproject.datasets"); } }

マニュアル登録

以下は、myproject.datasets パッケージに HighLevelManualFunction というクラスを作成して Transform を定義する方法の例です:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは、手動で登録するための高レベルTransformの例の計算関数です。 */ public final class HighLevelManualFunction { // 手動で登録するTransformのクラスは、計算関数だけを含んでいます。 @Compute public Dataset<Row> myComputeFunction(Dataset<Row> myInput) { // 高レベルTransformの計算関数は、"Dataset<Row>"型の出力を返します。 // ここでは、入力データの最初の10行のみを戻り値として返します。 return myInput.limit(10); } }

次に、ユーザーの PipelineDefiner 実装で、HighLevelTransform.builder() を使用してユーザーの Transform の定義を完成させ、この Transform をプロジェクトの Pipeline に追加します。これは Pipeline.register() を呼び出すことで行います:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // これは、高レベルのTransformのためのサンプル手動登録です。 HighLevelTransform highLevelManualTransform = HighLevelTransform.builder() // 使用する計算関数を渡します。ここでは、「HighLevelManualFunction」は、 // 高レベルのTransformの計算関数のクラス名に対応しています。 .computeFunctionInstance(new HighLevelManualFunction()) // 使用する入力データセットを渡します。 // "myInput" は、計算関数の入力パラメータに対応しています。 .putParameterToInputAlias("myInput", "/path/to/input/dataset") // 使用する出力データセットを渡します。 .returnedAlias("/path/to/output/dataset") .build(); pipeline.register(highLevelManualTransform); } }

ハイレベルの変換は、複数の入力と単一の出力をサポートします。ユーザーの計算関数の各入力データセットは putParameterToInputAlias() を使用して提供する必要があります。このメソッドでは、ユーザーの計算関数のパラメーターに対応する入力名と、入力データセットへのフルパスが必要です。例えば、上記の例では、「myInput」は my_compute_function() の入力パラメーター名です。出力データセットへのフルパスは returnedAlias() を使用して提供します。

ローレベルの変換

ローレベルの Transform は、DataFrame オブジェクトやファイルに依存するデータ変換を書いている場合に使用できます。

自動登録

自動登録を使用しているとします。以下に、myproject.datasets パッケージに LowLevelAutoTransform というクラスを定義して Transform オブジェクトを作成する例を示します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは自動登録用の低レベル変換の例です。 */ public final class LowLevelAutoTransform { // 自動登録された変換のクラスには、compute // 関数と入力/出力データセットに関する情報が含まれています。 // 自動登録には "@Input" と "@Output" のアノテーションが必要です。 @Compute public void myComputeFunction( @Input("/path/to/input/dataset") FoundryInput myInput, @Output("/path/to/output/dataset") FoundryOutput myOutput) { Dataset<Row> limited = myInput.asDataFrame().read().limit(10); // 低レベル変換の計算関数は、出力データセットに書き込みます。 // 出力を返す代わりに。 myOutput.getDataFrameWriter(limited).write(); } }

低レベルの変換では、複数の入力および出力データセットがサポートされています。したがって、各入力パラメーターは、@Input(入力データセットへの完全なパスを含む)で注釈を付ける必要があり、各出力パラメーターは、@Output(出力データセットへの完全なパスを含む)で注釈を付ける必要があります。

これで、TransformをプロジェクトのPipelineに追加できます。具体的には、PipelineDefiner実装でPipeline.autoBindFromPackage()メソッドを呼び出します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // 自動的に登録したいTransformsが含まれるJavaパッケージを提供します。 // Provide the Java package containing any Transforms you want to // automatically register. pipeline.autoBindFromPackage("myproject.datasets"); } }

手動登録

たとえば、手動登録を使用しているとしましょう。この場合、ユーザーの計算関数だけを含むクラスを定義します。以下は、myproject.datasets パッケージで LowLevelManualFunction というクラスを定義する方法の例です:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは、手動で登録するための低レベル変換のための計算関数の一例です。 */ public final class LowLevelManualFunction { // 手動で登録されたTransformのクラスには、計算関数だけが含まれます。 @Compute public void myComputeFunction(FoundryInput myInput, FoundryOutput myOutput) { Dataset<Row> limited = myInput.asDataFrame().read().limit(10); // 低レベルのTransformの計算関数は、出力データセットに書き込みます。 // 出力を返すのではなく。 myOutput.getDataFrameWriter(limited).write(); } }

これで、ユーザーの PipelineDefiner 実装において、LowLevelTransform.builder() を使用して実際の Transform オブジェクトを作成し、Pipeline.register() を呼び出すことで、この Transform をプロジェクトの Pipeline に追加できます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // これは低レベルTransformのサンプルの手動登録です。 LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder() // 使用する計算関数を渡します。ここで、"LowLevelManualFunction" は // 低レベルTransformの計算関数のクラス名に対応します。 .computeFunctionInstance(new LowLevelManualFunction()) // 使用する入力データセットを渡します。 // "myInput" は、計算関数の入力パラメータに対応します。 .putParameterToInputAlias("myInput", "/path/to/input/dataset") // 使用する出力データセットを渡します。 // "myOutput" は、計算関数の入力パラメータに対応します。 .putParameterToOutputAlias("myOutput", "/path/to/output/dataset") .build(); pipeline.register(lowLevelManualTransform); } }

ローレベルトランスフォームでは、複数の入力データセットと出力データセットをサポートしています。ユーザーのコンピューティング関数の各入力データセットは、putParameterToInputAlias() を使用して提供する必要があり、各出力データセットは putParameterToOutputAlias() を使用して提供する必要があります。これらのメソッドには、ユーザーのコンピューティング関数のパラメーターに対応する入力/出力名と、入力/出力データセットへの完全なパスが必要です。例えば、上記の例では、「myInput」および「myOutput」は my_compute_function() 内の入力パラメーター名です。ローレベルトランスフォームのコンピューティング関数は出力データセットに書き込み、値を返さないことを思い出してください。これが、入力/出力データセットがコンピューティング関数にパラメーターとして渡される理由です。