データ統合Java基礎的なトランスフォームトランスフォームとパイプライン

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

トランスフォームとパイプライン

現在、すべての変換はデフォルトでトランザクションタイプ SNAPSHOT になっています。

Transform は、データセットの計算方法を記述したものです。次の内容が記述されています。

  • 入力データセットと出力データセット
  • 入力データセットを出力データセットに変換するためのコード(これを計算関数と呼びます)
  • 追加の設定(実行時に使用するカスタム Transforms プロファイルなど)

入力データセット、出力データセット、変換コードは、Transform オブジェクトに指定され、Pipeline に登録されます。Transform をどのように定義するかは、以下の2つの要素によって決まります。

Transform タイプ

Tip

データ変換は、DataFrame オブジェクトだけでなく、ファイルを用いても表現することができます。これらの DataFrame オブジェクトは、通常の Spark DataFrames を指します。Spark Scala/Java API では、DataFrameDataset で表現されます。したがって、ユーザーは、データ変換コードの中で Dataset オブジェクトを直接操作します。
Spark の操作に関する詳細は、オンラインで利用できる Java API for Spark documentation を参照してください。

DataFrame オブジェクトに依存する変換の場合、次のようにできます。

  • 入力と出力のタイプが Dataset<Row> の高レベル Transform を定義する
  • 低レベル Transform を定義し、入力データセットを含む Dataset<Row> にアクセスするためのメソッドを明示的に呼び出す

ファイルに依存する変換の場合、低レベル Transform を定義し、データセット内のファイルにアクセスする必要があります。

以下は、2種類の Transform の主な違いをまとめたものです。

説明高レベル Transform低レベル Transform
DataFrame オブジェクトに依存するデータ変換が可能✓ *
ファイルへのアクセスに依存するデータ変換が可能
複数の入力データセットをサポート
複数の出力データセットをサポート
計算関数は DataFrame の値を返さなければならない
計算関数は値を返す代わりに出力に書き込む

* DataFrame オブジェクトに依存するデータ変換には、高レベル Transform の使用をお勧めします。

どちらの Transform タイプでも、計算関数を含むクラスを作成する必要があります。このクラス内で、計算関数は、@Compute でアノテーションされた public で non-static なメソッドでなければなりません。このアノテーションがないと、データ変換コードが正しく登録されません。

登録タイプ

リポジトリ内の各 Transforms Java サブプロジェクトは、単一の Pipeline オブジェクトを公開します。この Pipeline オブジェクトは、以下の目的で使用されます。

  1. Foundry にデータセットを登録し、それらを構築する方法を指示する
  2. Foundry ビルド中に、特定のデータセットを構築するための Transform オブジェクトを検索および実行する

エントリポイント

Java 変換を実行するためのランタイムは、プロジェクトの Pipeline を見つけることができる必要があります。Transforms Java は、サービスローディングのための標準 Java 機能 を使用していることに注意してください。

プロジェクトに関連付けられた Pipeline オブジェクトを定義するためには、PipelineDefiner オブジェクトを実装する必要があります。この PipelineDefiner オブジェクトでは、プロジェクトの Pipeline に Transform を追加できます。具体的には、各 Java サブプロジェクトは、1つの PipelineDefiner オブジェクトを実装する必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // ここにコードを追加して、プロジェクトのパイプラインにTransformsを追加します。 // 自動または手動登録を使用して。 } }

Java パッケージを作成し、PipelineDefiner オブジェクトを実装したら、resources/META-INF/services/com.palantir.transforms.lang.java.api.PipelineDefiner を更新して、ユーザーの PipelineDefiner 実装を指すようにする必要があります。

Copied!
1 2 3 4 // あなたの "PipelineDefiner" 実装のクラス名にこれを置き換えてください。 // 各Javaサブプロジェクトが単一の "PipelineDefiner" を実装しているため、このファイルは // 単一のエントリしか含むことができません。 myproject.MyPipelineDefiner

MyPipelineDefiner は、ユーザーの PipelineDefiner 実装のクラス名を指します。

パイプラインに変換を追加する

プロジェクトのパイプラインと関連付けられた Transform がデータセットを出力として宣言すると、Foundry でこのデータセットを構築できます。Transform オブジェクトを Pipeline に追加する2つの推奨方法は、手動登録自動登録です。

ヒント

より高度なワークフローがある場合や、明示的に各 Transform オブジェクトをプロジェクトのパイプラインに追加したい場合は、手動登録を使用できます。例えば、同じデータ変換ロジックを複数の入力および出力データセットの組み合わせにメタプログラム的に適用したい場合に、手動登録を使用すると便利です。

それ以外の場合、登録コードが簡潔で包括的になるように、自動登録を使用することを強くお勧めします。自動登録では、Pipeline.autoBindFromPackage() がパッケージ内の Transform 定義を検出します(これらのオブジェクトに必要な @Input および @Output アノテーションがある場合のみ)。

自動登録

プロジェクトの複雑さが増すにつれて、Transform オブジェクトを手動で Pipeline に追加することは面倒になることがあります。したがって、Pipeline オブジェクトは、Java パッケージ内のすべての Transform オブジェクトを検出するための autoBindFromPackage() メソッドを提供します。自動登録を使用するには、次の操作を行う必要があります。

  • Transform に対応するクラスを定義します。自動登録では、入力および出力データセットに関する情報と計算機能が含まれるクラスを定義します。
  • 適切な @Input および @Output アノテーションを追加します。
  • 提供された Java パッケージの Transform 定義を登録するために、Pipeline.autoBindFromPackage() メソッドを呼び出します。autoBindFromPackage() メソッドは、必要なアノテーションがある Transform 定義のみを登録します。必要なアノテーションがない Transforms は、提供された Java パッケージに存在していても、プロジェクトの Pipeline には追加されません。

手動登録

Transform オブジェクトは、Pipeline.register() メソッドを使用して手動で Pipeline に追加できます。このメソッドを呼び出すことで、1つの Transform を登録できます。Transforms を手動登録で使用するには、次の操作を行う必要があります。

  • Transform オブジェクトの計算機能を含むクラスを定義します。手動登録では、PipelineDefiner 実装内で入力および出力データセットに関する情報を提供しますが、自動登録とは異なります。
  • HighLevelTransform.builder() または LowLevelTransform.builder() を使用して、使用する計算機能を指定し、入力および出力データセットを提供します。
  • Pipeline.register() メソッドを呼び出して、明示的にプロジェクトのパイプラインに Transform 定義を追加します。
警告

@StopProgagating@StopRequiring などのアノテーションは、自動登録された Java 変換でのみサポートされていることに注意してください。

Transform コンテキスト

データ変換が入力データセット以外のものに依存する場合があります。 たとえば、現在の Spark セッションにアクセスするか、jobSpec の変換パラメーターにアクセスする必要がある変換が必要になる場合があります。このような場合、TransformContext オブジェクトを変換に挿入できます。これを行うには、計算機能が TransformContext タイプのパラメーターを受け入れる必要があります。TransformContext には、Transforms の authHeader、Spark セッション、変換パラメーター、および ServiceDiscovery オブジェクトが含まれます。ServiceDiscovery クラスは、検出された Foundry サービスのサービス URI を公開します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは、TransformContext にアクセスする例としての高レベルな Transform です */ @Compute public Dataset<Row> myComputeFunction(Dataset<Row> myInput, TransformContext context) { // TransformContext から "limit" パラメータを取得し、整数に変換 int limit = (int) context.parameters().get("limit"); // 入力データセットの行数を制限 return myInput.limit(limit); }
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは、TransformContextにアクセスする低レベルのTransformの例です */ @Compute public void compute(FoundryInput input, FoundryOutput output, TransformContext context) { // コンテクストからパラメータを取得し、"limit"というキーで取得した値を整数にキャストします int limit = (int) context.parameters().get("limit"); // 入力からデータフレームを読み込み、上で設定したlimitでデータの上限を設定し、結果を出力します output.getDataFrameWriter(input.asDataFrame().read().limit(limit)).write(); }