注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
現在、すべての変換はデフォルトでトランザクションタイプ SNAPSHOT
になっています。
Transform
は、データセットの計算方法を記述したものです。次の内容が記述されています。
入力データセット、出力データセット、変換コードは、Transform
オブジェクトに指定され、Pipeline
に登録されます。Transform
をどのように定義するかは、以下の2つの要素によって決まります。
データ変換は、DataFrame
オブジェクトだけでなく、ファイルを用いても表現することができます。これらの DataFrame
オブジェクトは、通常の Spark DataFrames を指します。Spark Scala/Java API では、DataFrame
は Dataset
で表現されます。したがって、ユーザーは、データ変換コードの中で Dataset
オブジェクトを直接操作します。
Spark の操作に関する詳細は、オンラインで利用できる Java API for Spark documentation を参照してください。
DataFrame
オブジェクトに依存する変換の場合、次のようにできます。
Dataset<Row>
の高レベル Transform を定義するDataset<Row>
にアクセスするためのメソッドを明示的に呼び出すファイルに依存する変換の場合、低レベル Transform を定義し、データセット内のファイルにアクセスする必要があります。
以下は、2種類の Transform の主な違いをまとめたものです。
説明 | 高レベル Transform | 低レベル Transform |
---|---|---|
DataFrame オブジェクトに依存するデータ変換が可能 | ✓ * | ✓ |
ファイルへのアクセスに依存するデータ変換が可能 | ✓ | |
複数の入力データセットをサポート | ✓ | ✓ |
複数の出力データセットをサポート | ✓ | |
計算関数は DataFrame の値を返さなければならない | ✓ | |
計算関数は値を返す代わりに出力に書き込む | ✓ |
* DataFrame
オブジェクトに依存するデータ変換には、高レベル Transform の使用をお勧めします。
どちらの Transform
タイプでも、計算関数を含むクラスを作成する必要があります。このクラス内で、計算関数は、@Compute
でアノテーションされた public で non-static なメソッドでなければなりません。このアノテーションがないと、データ変換コードが正しく登録されません。
リポジトリ内の各 Transforms Java サブプロジェクトは、単一の Pipeline
オブジェクトを公開します。この Pipeline
オブジェクトは、以下の目的で使用されます。
Transform
オブジェクトを検索および実行するJava 変換を実行するためのランタイムは、プロジェクトの Pipeline
を見つけることができる必要があります。Transforms Java は、サービスローディングのための標準 Java 機能 を使用していることに注意してください。
プロジェクトに関連付けられた Pipeline
オブジェクトを定義するためには、PipelineDefiner
オブジェクトを実装する必要があります。この PipelineDefiner
オブジェクトでは、プロジェクトの Pipeline に Transform を追加できます。具体的には、各 Java サブプロジェクトは、1つの PipelineDefiner
オブジェクトを実装する必要があります。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
package myproject; import com.palantir.transforms.lang.java.api.Pipeline; import com.palantir.transforms.lang.java.api.PipelineDefiner; public final class MyPipelineDefiner implements PipelineDefiner { @Override public void define(Pipeline pipeline) { // ここにコードを追加して、プロジェクトのパイプラインにTransformsを追加します。 // 自動または手動登録を使用して。 } }
Java パッケージを作成し、PipelineDefiner
オブジェクトを実装したら、resources/META-INF/services/com.palantir.transforms.lang.java.api.PipelineDefiner
を更新して、ユーザーの PipelineDefiner
実装を指すようにする必要があります。
Copied!1 2 3 4
// あなたの "PipelineDefiner" 実装のクラス名にこれを置き換えてください。 // 各Javaサブプロジェクトが単一の "PipelineDefiner" を実装しているため、このファイルは // 単一のエントリしか含むことができません。 myproject.MyPipelineDefiner
MyPipelineDefiner
は、ユーザーの PipelineDefiner
実装のクラス名を指します。
プロジェクトのパイプラインと関連付けられた Transform
がデータセットを出力として宣言すると、Foundry でこのデータセットを構築できます。Transform
オブジェクトを Pipeline
に追加する2つの推奨方法は、手動登録と自動登録です。
より高度なワークフローがある場合や、明示的に各 Transform
オブジェクトをプロジェクトのパイプラインに追加したい場合は、手動登録を使用できます。例えば、同じデータ変換ロジックを複数の入力および出力データセットの組み合わせにメタプログラム的に適用したい場合に、手動登録を使用すると便利です。
それ以外の場合、登録コードが簡潔で包括的になるように、自動登録を使用することを強くお勧めします。自動登録では、Pipeline.autoBindFromPackage()
がパッケージ内の Transform
定義を検出します(これらのオブジェクトに必要な @Input
および @Output
アノテーションがある場合のみ)。
プロジェクトの複雑さが増すにつれて、Transform
オブジェクトを手動で Pipeline
に追加することは面倒になることがあります。したがって、Pipeline
オブジェクトは、Java パッケージ内のすべての Transform
オブジェクトを検出するための autoBindFromPackage()
メソッドを提供します。自動登録を使用するには、次の操作を行う必要があります。
Transform
に対応するクラスを定義します。自動登録では、入力および出力データセットに関する情報と計算機能が含まれるクラスを定義します。@Input
および @Output
アノテーションを追加します。Transform
定義を登録するために、Pipeline.autoBindFromPackage()
メソッドを呼び出します。autoBindFromPackage() メソッドは、必要なアノテーションがある Transform 定義のみを登録します。必要なアノテーションがない Transforms は、提供された Java パッケージに存在していても、プロジェクトの Pipeline
には追加されません。Transform
オブジェクトは、Pipeline.register()
メソッドを使用して手動で Pipeline
に追加できます。このメソッドを呼び出すことで、1つの Transform
を登録できます。Transforms を手動登録で使用するには、次の操作を行う必要があります。
Transform
オブジェクトの計算機能を含むクラスを定義します。手動登録では、PipelineDefiner 実装内で入力および出力データセットに関する情報を提供しますが、自動登録とは異なります。HighLevelTransform.builder()
または LowLevelTransform.builder()
を使用して、使用する計算機能を指定し、入力および出力データセットを提供します。Pipeline.register()
メソッドを呼び出して、明示的にプロジェクトのパイプラインに Transform
定義を追加します。@StopProgagating
や @StopRequiring
などのアノテーションは、自動登録された Java 変換でのみサポートされていることに注意してください。
データ変換が入力データセット以外のものに依存する場合があります。
たとえば、現在の Spark セッションにアクセスするか、jobSpec の変換パラメーターにアクセスする必要がある変換が必要になる場合があります。このような場合、TransformContext
オブジェクトを変換に挿入できます。これを行うには、計算機能が TransformContext
タイプのパラメーターを受け入れる必要があります。TransformContext
には、Transforms の authHeader、Spark セッション、変換パラメーター、および ServiceDiscovery
オブジェクトが含まれます。ServiceDiscovery
クラスは、検出された Foundry サービスのサービス URI を公開します。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは、TransformContext にアクセスする例としての高レベルな Transform です */ @Compute public Dataset<Row> myComputeFunction(Dataset<Row> myInput, TransformContext context) { // TransformContext から "limit" パラメータを取得し、整数に変換 int limit = (int) context.parameters().get("limit"); // 入力データセットの行数を制限 return myInput.limit(limit); }
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.transforms.lang.java.api.TransformContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** * これは、TransformContextにアクセスする低レベルのTransformの例です */ @Compute public void compute(FoundryInput input, FoundryOutput output, TransformContext context) { // コンテクストからパラメータを取得し、"limit"というキーで取得した値を整数にキャストします int limit = (int) context.parameters().get("limit"); // 入力からデータフレームを読み込み、上で設定したlimitでデータの上限を設定し、結果を出力します output.getDataFrameWriter(input.asDataFrame().read().limit(limit)).write(); }