注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
インクリメンタル計算は高度な機能です。この機能を利用する前に、ユーザーガイドの残りを理解してください。
これまでのユーザーガイドで示された変換は、実行するたびに出力データセット全体を再計算します。これは多くの不必要な作業を引き起こす可能性があります。以下の例を考えてみてください:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
package myproject.datasets; import com.palantir.transforms.lang.java.api.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class FilterTransform { @Compute // 日本語: 計算関数 public void myComputeFunction( // 日本語: 入力データセット @Input("/examples/students_hair_eye_color") FoundryInput myInput, // 日本語: 出力データセット @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { // 日本語: データフレームとして入力データセットを読み込む Dataset<Row> inputDf = myInput.asDataFrame().read(); // 日本語: 目が茶色のデータだけをフィルタリングし、出力データセットに書き込む myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(); } }
/examples/students_hair_eye_color
入力データセットに新しいデータが追加されると、filter()
は入力全体に対して実行されます。これは、入力に追加された新しいデータだけではありません。これは、計算リソースと時間を無駄にすることになります。
トランスフォームがビルド履歴を認識できるようになると、出力を計算する方法をよりスマートにすることができます。具体的には、入力に対する変更を使用して、出力データセットを変更します。すでにマテリアライズされたデータを使用してテーブルを再マテリアライズするこのプロセスは、インクリメンタル計算 と呼ばれます。インクリメンタル計算がなければ、出力データセットは常にトランスフォームの最新の出力で置き換えられます。
上記の例で示したトランスフォームに戻ってみましょう。トランスフォームは、students
データセットに対して フィルター処理する()
を実行して、茶髪の学生を書き出します。インクリメンタル計算を使用すると、students
に2人の新しい学生のデータが追加された場合、トランスフォームはビルド履歴に関する情報を使用して、新しい茶髪の学生だけを出力に追加することができます。
RAW DERIVED
+---+-----+-----+ +---+-----+-----+
| id| hair| eye| | id| hair| eye|
+---+-----+-----+ Build 1 +---+-----+-----+
| 17|Black|Brown| ---------> | 18|Brown|Brown|
| 18|Brown|Brown| +---+-----+-----+
| 19| Red|Brown|
+---+-----+-----+ ...
... ...
+---+-----+-----+ Build 2 +---+-----+-----+
| 20|Brown|Brown| ---------> | 20|Brown|Brown|
| 21|Black|Blue | +---+-----+-----+
+---+-----+-----+
このコードは、データフレームの変換を示しています。具体的には、RAWデータフレームからDERIVEDデータフレームへの変換です。
transforms-javaを使用してインクリメンタル変換を書く方法について、ステップバイステップでガイドします。transforms-pythonとは対照的に、transforms-javaは注釈を使用して自動的にインクリメンタリティを検証し、インクリメンタルな方法で変換を適用することはありません。Javaでインクリメンタル変換を書くプロセスは、ユーザーが直接制御し、どのケースで変換をインクリメンタルに行い、いつ行わないかを明示的に決定できます。入力データセットがどのように変更されたかを解釈することで、ユーザーは出力データセットをインクリメンタルな方法かスナップショットのような方法で更新するかを決定できます。
取るべき最初のステップは、ユーザーの入力の解釈です。入力データセットは複数の方法で変更可能であり、特定の状況でのみインクリメンタルな変換を適用することができます。DataFrameModificationType(またはFilesModificationType)は、データセットが変更可能なさまざまな方法を表現します。異なるモードは次のとおりです:
入力がどのように変更されたかに基づいて、入力データセットから何を読み取り、出力データセットに何を書き込むかを決定することができます。
入力がどのように変更されたかを知ることで、それに応じて読み取ることができます。トランザクションがデータを追加しただけの場合、安全にインクリメンタルに行動し、変更されたものだけを読み取ることができることが確実です。一方、既存の行の変更を含む入力データセットへの変更がある場合、全ビューを再読み取りしたいと思うかもしれません。Transforms-Java APIは、readForRange()
メソッドのおかげで、入力データセットのためのさまざまな読み取りモードを可能にします。
ReadRangeは可能な読み取り範囲を公開します。異なるモードは次のとおりです:
入力の変更タイプを解釈することで、次の例で示すように、データをどのように読み取るかを決定することができます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
private ReadRange getReadRange(FoundryInput input) { // 入力データセットの変更タイプに応じて、読み取り範囲を判断する switch (input.asDataFrame().modificationType()) { case UNCHANGED: // 入力データセットに変更がない場合、未処理の部分のみを読み取る LOG.info("No changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case APPENDED: // 入力データセットに追加のみの変更がある場合、未処理の部分のみを読み取る LOG.info("Append-only changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case UPDATED: // 入力データセットに更新タイプの変更がある場合、ビュー全体を読み取る LOG.info("Update-type changes in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; case NEW_VIEW: // 入力データセットに新しいビューがある場合、ビュー全体を読み取る LOG.info("New view in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; default: // 入力データセットの未知の変更タイプに対して例外をスローする throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } }
それに伴って、compute
メソッドを変更できます。
Copied!1 2 3 4 5 6 7 8 9 10 11
@Compute public void myComputeFunction( // "/examples/students_hair_eye_color" というパスからデータを読み込むための入力パラメータ @Input("/examples/students_hair_eye_color") FoundryInput myInput, // "/examples/students_hair_eye_color_filtered" というパスにデータを書き込むための出力パラメータ @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { // myInputからDataFrame形式でデータを読み込みます Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); // "eye = 'Brown'"という条件を満たすデータだけをフィルタリングして、myOutputに書き込みます myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(); }
現時点では、入力データセットの異なる部分を読み込んでいるだけで、出力データセットに対しては異なる処理は行っていません。この例のコードをこの時点まで実行すると、入力のどの部分を読み込んでいても、出力では常にスナップショットトランザクションになります。インクリメンタル変換を適用する前に、チュートリアルを最後まで進めて、出力データセットを正しく変更する方法を理解してください。
このステップでは、ユーザーが必要なデータの変換を適用します。入力の変更に応じて、読み込まれるデータが異なることに注意してください。今回のケースでは、ブラウンアイをフィルター処理する簡単な変換を行います。これは以下のように分離できます。
Copied!1 2
// 入力データフレームから、目の色がブラウンのデータだけをフィルタリングします inputDf = inputDf.filter("eye = 'Brown'");
入力データセットの変更を解釈し、入力の所望の部分を読み取り、データを変換ロジックに従って変換した後、出力を適切に書き込むことができます。WriteMode は、異なる書き込みモードを提供します。異なるモードは以下の通りです。
例えば、このケースでは、入力変更タイプに基づいて出力タイプを選択できます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
private WriteMode getWriteMode(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: // 入力データセットに変更がない場合、更新モードで書き込みます LOG.info("No changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case APPENDED: // 入力データセットに追加のみの変更がある場合、更新モードで書き込みます LOG.info("Append-only changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case UPDATED: // 入力データセットに更新タイプの変更がある場合、スナップショットモードで書き込みます LOG.info("Update-type changes in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; case NEW_VIEW: // 入力データセットに新しいビューがある場合、スナップショットモードで書き込みます LOG.info("new view in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; default: // 入力データセットの未知の変更タイプの場合、例外をスローします throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } }
WriteMode.UPDATE
と
DataFrameModificationType.UPDATED
を混同しないでください。前者は結果として出力データセットのインクリメンタルな修正を行い、それにより下流のデータセットには DataFrameModificationType.APPENDED
が適用されます。後者は既存の行の追加と修正を含む入力データセットの修正です。
最後に、write()
関数は書き込みモードを含むように修正することができます:
Copied!1 2 3 4 5 6 7 8 9 10 11
@Compute public void myComputeFunction( // "/examples/students_hair_eye_color"というパスからデータを入力として読み込む @Input("/examples/students_hair_eye_color") FoundryInput myInput, // 処理結果を"/examples/students_hair_eye_color_filtered"というパスに出力する @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { // 入力データをデータフレームとして読み込む Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); // "eye = 'Brown'"という条件に一致するデータだけをフィルタリングし、その結果を出力する myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput)); }
これらの要素を組み合わせることで、簡単なインクリメンタルなフィルター処理変換を構築できます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
package myproject.datasets; import com.palantir.transforms.lang.java.api.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // FilterTransformクラスの定義 public final class FilterTransform { // ログ出力用のLoggerを定義 private static final Logger LOG = LoggerFactory.getLogger(FilterTransform.class); // Computeアノテーションを付けたメソッドは、Foundryによってコンピューティングタスクとして実行されます @Compute public void myComputeFunction( // Inputアノテーションは、このメソッドがデータを読み込むための入力パスを指定します @Input("/examples/students_hair_eye_color") FoundryInput myInput, // Outputアノテーションは、このメソッドがデータを書き込むための出力パスを指定します @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { // 入力データをDataFrameとして読み込み、フィルタリングを行います Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); // フィルタリングした結果を出力します myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput)); } // 入力データの変更タイプに基づいて、データを読み込む範囲を決定します private ReadRange getReadRange(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("入力データセットに変更はありません、未処理の部分のみ読み込みます"); return ReadRange.UNPROCESSED; case APPENDED: LOG.info("入力データセットに追加のみの変更があります、未処理の部分のみ読み込みます"); return ReadRange.UNPROCESSED; case UPDATED: LOG.info("入力データセットに更新型の変更があります、全体を読み込みます"); return ReadRange.ENTIRE_VIEW; case NEW_VIEW: LOG.info("入力データセットに新しいビューがあります、全体を読み込みます"); return ReadRange.ENTIRE_VIEW; default: throw new IllegalArgumentException("入力データセットの未知の変更タイプ " + input.asDataFrame().modificationType()); } } // 入力データの変更タイプに基づいて、データの書き込みモードを決定します private WriteMode getWriteMode(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("入力データセットに変更はありません、更新モードで書き込みます"); return WriteMode.UPDATE; case APPENDED: LOG.info("入力データセットに追加のみの変更があります、更新モードで書き込みます"); return WriteMode.UPDATE; case UPDATED: LOG.info("入力データセットに更新型の変更があります、スナップショットモードで書き込みます"); return WriteMode.SNAPSHOT; case NEW_VIEW: LOG.info("入力データセットに新しいビューがあります、スナップショットモードで書き込みます"); return WriteMode.SNAPSHOT; default: throw new IllegalArgumentException("入力データセットの未知の変更タイプ " + input.asDataFrame().modificationType()); } } }
上述の通り、入力の変更タイプを評価し、それに応じて入力を読み込みます。その後、出力データセットを増分更新するか、新しいスナップショットトランザクションを開始するかを決定します。
たとえば、ほとんどの場合は増分変換を実行したいが、時折データセットのスナップショットを再実行する必要があるとします。
手作業で望む結果をハードコーディングするのを避けるために、この入力が変更されるたびに出力が SNAPSHOT
書き込みモードを使用するような結果をもたらす新しい入力を追加できます。この新しい入力は、実質的にスナップショットトリガーデータセットとして機能します。この新しいスナップショットトリガーデータセットの変更タイプに基づいて、変換の他の入力の読み込み範囲を調整する必要があることに注意してください。
また、証明なしの空の追加トランザクションを作成することにより、外部からスナップショットを強制的に作成することも可能です。ただし、transforms-java はこのような機能を公開していないため、このガイドの範囲外です。
このセクションの高度な機能は、正しく使用しないと深刻な悪影響を及ぼす可能性があります。その影響を完全に理解していない場合は、これらの機能を使用しないでください。適切な注意と警戒心を持たずに実行すると、望ましくない結果が発生する可能性が高まります。質問がある場合は、Palantir の担当者にお問い合わせください。
高度な機能は通常、ユーザーの @Compute
関数の上に注釈として含まれています。しかし、変換が手動で登録されている場合は、Transform Builder にプロパティを追加する必要があります。
増分ビルドがアペンド専用データセットが無限に成長し、その成長に必要なディスクスペースが不足している場合、上流データセットの一部を削除する必要があります。
しかし、これは原始データセットの変更が APPENDED
変更タイプを結果としないため、増分性を壊す可能性があります。
IncrementalOptions.IGNORE_INCREMENTAL_DELETES
はこれを避け、上流データセットの削除を破壊的な変更とは扱わないようにします。
低レベルの変換でのみ、増分削除を無視することが可能です。
Copied!1 2 3 4 5 6 7 8
@Compute // @UseIncrementalOptions(IncrementalOptions.IGNORE_INCREMENTAL_DELETES) アノテーションは、増分削除を無視するように指示します。 public void myComputeFunction( // @Input アノテーションは、関数が入力として使用するデータの場所を指定します。 @Input("/Users/admin/students_data") FoundryInput myInput, // @Output アノテーションは、関数が出力として使用するデータの場所を指定します。 @Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) { ...
もし、ユーザーの変換が手動で登録されている場合は、以下のコードブロックのようにプロパティをビルダーに追加してください。
Copied!1 2 3 4 5 6 7 8 9 10 11
// 低レベル変換を手動で行うインスタンスを作成 LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder() // MyLowLevelManualFunctionを計算関数インスタンスとして設定 .computeFunctionInstance(new MyLowLevelManualFunction()) // 入力データセットのパスを設定 .putParameterToInputAlias("myInput", "/path/to/input/dataset") // 出力データセットのパスを設定 .putParameterToOutputAlias("myOutput", "/path/to/output/dataset") // インクリメンタルな削除を無視 .ignoreIncrementalDeletes(true) .build();
入力データセットのスキーマ変更は、インクリメンタル変換と組み合わせると予期しない結果をもたらす可能性があります。
以下のすべてのドキュメンテーションを読み、この機能を使用する前にすべての潜在的な影響を理解してください。
スキーマ変更を無視できるのは、低レベルの変換のみです。
インクリメンタルビルドが依存するデータセットのスキーマが変更されると、その変更はDataFrameModificationType.NEW_VIEW
を結果として引き起こし、可能性としてインクリメンタル性が破壊されます。
ただし、IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE
オプションが設定されている場合、スキーマ変更は新しいビューを引き起こさない。代わりに、入力データセットのスキーマ変更はDataFrameModificationType.UNCHANGED
と解釈され、スキーマ変更タイプのフラグSchemaModificationType.NEW_SCHEMA
が設定され、ユーザーがこの特別なケースを明示的に処理することができます。
Copied!1 2 3 4 5 6 7 8 9 10 11
@Compute // スキーマの変更タイプを使用するための注釈 @UseIncrementalOptions(IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE) // myComputeFunctionという計算関数を定義 public void myComputeFunction( // 入力データへのパスを指定 @Input("/Users/admin/students_data") FoundryInput myInput, // 出力データへのパスを指定 @Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) { ... }
あなたの変換が手動で登録されている場合、以下のコードブロックのようにプロパティをビルダーに追加してください。
Copied!1 2 3 4 5 6 7 8 9 10 11 12
// LowLevelTransformのインスタンスをビルダーパターンを用いて作成します。 LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder() // MyLowLevelManualFunctionのインスタンスを計算機能として設定します。 .computeFunctionInstance(new MyLowLevelManualFunction()) // "myInput"という名前のパラメータを入力データセットのパスにエイリアスとして設定します。 .putParameterToInputAlias("myInput", "/path/to/input/dataset") // "myOutput"という名前のパラメータを出力データセットのパスにエイリアスとして設定します。 .putParameterToOutputAlias("myOutput", "/path/to/output/dataset") // スキーマ変更タイプを使用するように設定します。 .useSchemaModificationType(true) // ビルドを行い、LowLevelTransformのインスタンスを完成させます。 .build();
変換に関連するビルドは、変換が入力データセットにどのように依存するかによって成功するか失敗するかが決まります。 より正確には、変換がスキーマ変更に関与する行に依存している場合、それらの行に対する変更により、インクリメンタル変換が失敗します。 このような場合、再度インクリメンタル変換を使用できるようにする前に、新しいスナップショットが必要です。
変換が特定の行に依存している場合:
filter("eye = 'Brown'")
があり、RAWデータセットで行eye
が名前変更されたり削除されたりした場合、FilterTransform
を再トリガーするとインクリメンタル更新が失敗します)。hair
を削除した場合、FilterTransform
が失敗します)。変換がスキーマ変更に依存しない場合、インクリメンタルビルドは成功します。
たとえば、最初に変換にid
とeye
のselect
文を追加し、RAWデータセットから初期スナップショットビルドをトリガーし、次にRAWデータセットで行hair
を削除した場合、インクリメンタルビルドは成功し、スキーマ変更はインクリメンタル変換に影響を与えません。
また、スキーマへの追加的な変更(例:新しい行の追加)がある場合も、ビルドは常に成功します。