注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

インクリメンタルトランスフォームのリファレンス

インクリメンタルデコレーター

Tip

このガイドの残りの部分では、インクリメンタルビルドと非インクリメンタルビルドについて説明しています。すべてのケースでincremental()デコレーターが使用されていることを前提としています。そのため、この用語は実際にトランスフォームがインクリメンタルに実行されるかどうかを指します。

incremental()デコレーターは、トランスフォームの計算関数をインクリメンタル計算を可能にするロジックでラップするために使用できます:

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None students_df = students.dataframe('added') processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

incremental()デコレーターは、既存のtransform(), transform_df(), またはtransform_pandas()デコレーターを使用するトランスフォームをラップするために使用できます。トランスフォームの計算関数はインクリメンタルおよび非インクリメンタルの両方で実行できることをサポートする必要がありますincremental()デコレーターは次の2つの重要なことを行います:

  • トランスフォームが前回のビルドに関する情報を検索できるようにします。この情報を使用して、incremental()デコレーターは、以下の要件に従ってトランスフォームがインクリメンタルに実行できるかどうかを決定します。
  • 入力、出力、およびコンテキストオブジェクトを追加機能を提供するインクリメンタルサブクラスに変換します。具体的には、TransformInputIncrementalTransformInputに、TransformOutputIncrementalTransformOutputに、TransformContextIncrementalTransformContextになります。これらのインクリメンタルオブジェクトは、デコレーターでラップされたトランスフォームに渡されます。

インクリメンタルデコレーターは6つの引数を取ります:

Copied!
1 2 3 4 5 6 7 8 transforms.api.incremental( require_incremental=False, semantic_version=1, snapshot_inputs=None, allow_retention=False, strict_append=False, v2_semantics=False )

require_incremental引数をTrueに設定すると、インクリメンタルに実行できない場合にトランスフォームが失敗します。require_incremental=Trueでもスナップショットとして実行が許可されるケースは2つあります:

  1. 出力の1つが以前にビルドされたことがない。
  2. セマンティックバージョンが変更された場合、スナップショットが明示的に要求されたことを意味します。

トランスフォームがインクリメンタルに実行されない原因をデバッグするには、ドライバーログで警告transforms.api._incremental: Not running incrementallyを確認してください。

:func:~transforms.api.incrementalデコレーターのsemantic_version引数は、トランスフォームの次回の実行を非インクリメンタルに強制することができます。

  • 現在の実行のセマンティックバージョンが前回の実行のセマンティックバージョンと異なる場合、トランスフォームは非インクリメンタルに実行されます。
  • 指定されていない場合、セマンティックバージョンは1に設定されます。
  • 前回の実行のセマンティックバージョンが存在しない場合(例:既存のトランスフォームをインクリメンタルトランスフォームに変換する場合)、値1が想定されます。これにより、新しいスナップショットを必要とせずにトランスフォームをインクリメンタルに実行し始めることができます。
  • トランスフォームの次回の実行を非インクリメンタルに強制するには、@incremental()デコレーターのsemantic_version引数を増やします。
    • セマンティックバージョンを増やす際には、整数のみを使用する必要があることに注意してください。

snapshot_inputs引数は、一部の入力をスナップショット入力として定義することを可能にします。スナップショット入力は、非スナップショット入力とは異なり、更新および削除の変更をサポートします。詳細はスナップショット入力を参照してください。

allow_retention引数をTrueに設定すると、Foundry Retentionによる入力および出力データセット内のファイルの削除が許可され、トランスフォームのインクリメンタリティが維持されます。

strict_appendパラメータがTrueに設定され、入力データセットがインクリメンタルである場合、基礎となるFoundryのトランザクションタイプはAPPENDに設定され、インクリメンタル書き込みにはAPPENDトランザクションが使用されます。既存のファイルを上書きしようとすると例外が発生します。 入力データセットがインクリメンタルでない場合、strict_appendSNAPSHOTとして実行されます。コードがAPPENDとしてインクリメンタルに実行されることを保証するには、require_incremental=Trueを使用してください。既存のファイルを上書きしようとすると成功します。 書き込み操作は、Parquetの要約メタデータやHadoopのSUCCESSファイルなどの補助ファイルを含め、ファイルを上書きしない場合があります。すべてのFoundry形式のインクリメンタル書き込みは、strict_appendモードをサポートする必要があります。

v2_semanticsパラメータがTrueに設定されている場合、V2インクリメンタルセマンティクスが使用されます。v2とv1のインクリメンタルセマンティクスの間に動作の違いはなく、すべてのユーザーがこれをTrueに設定することをお勧めします。v2セマンティクスを使用している場合にのみ、非カタログ入力および出力リソースをインクリメンタルに読み書きすることができます。

重要な情報

前述のように、incremental()デコレーターでラップされたトランスフォームの計算関数は、インクリメンタルおよび非インクリメンタルの両方で実行できることをサポートする必要があります。デフォルトの読み書きモード(このページの残りの部分で詳しく説明されています)は、この二重ロジック要件をサポートしますが、計算コンテキストis_incrementalプロパティに基づいて分岐する必要がある場合もあります。

もう1つの重要な点は、transform_df()またはtransform_pandas()incremental()デコレーターを使用する場合、デフォルトの読み書きモードにのみアクセスできることです。追加された出力行が追加された入力行の関数であるトランスフォームがある場合、これで十分です。ただし、入力の読み取りモードまたは出力の書き込みモードを設定する必要がある結合、集計、または区別などの複雑なロジックを実行する場合は、transform()incremental()デコレーターを使用する必要があります。transform()とインクリメンタルデコレーターを使用することにより、読み書きモードを設定することができます。

Warning

コードリポジトリのプレビューフィーチャーは、常にトランスフォームを非インクリメンタルモードで実行します。これは、require_incremental=Trueincremental()デコレーターに渡された場合でも同様です。

入力と出力のインクリメンタルモード

IncrementalTransformInput

transforms.api.IncrementalTransformInputオブジェクトは、オプションの読み取りモードを取るためにdataframe()メソッドを拡張します。

オプションの入力読み取りモードパラメータは、transform()デコレーターを使用している場合にのみ利用可能です。transform_df()およびtransform_pandas()デコレーターは、引数なしで入力に対してdataframe()およびpandas()を呼び出し、PySparkおよびPandas DataFrameオブジェクトを抽出します。これにより、使用される読み取りモードは常にデフォルトのaddedモードになります。

インクリメンタルデコレーターを使用してトランスフォームを定義する場合、トランスフォームがインクリメンタルまたは非インクリメンタルに実行されるかどうかに応じて、読み取りモードの動作は異なります:

読み取りモードインクリメンタル動作非インクリメンタル動作
added *前回のトランスフォームの実行以降に入力に追加された新しい行を含むDataFrameを返します。すべての行が未処理と見なされるため、データセット全体を含むDataFrameを返します。
previous前回のトランスフォーム実行時に与えられた入力全体を含むDataFrameを返します。空のDataFrameを返します。
current現在の実行のための入力データセット全体を含むDataFrameを返します。現在の実行のための入力データセット全体を含むDataFrameを返します。これはaddedと同じです。

デフォルトの読み取りモードはaddedです。

トランスフォームがincremental()とマークされているにもかかわらず、入力がインクリメンタルな方法で扱われることが望ましくない場合があります。これに関する詳細とこれらのタイプの入力に対する読み取りモードの動作の違いについては、スナップショット入力を参照してください。

デフォルトの出力読み取りモードはcurrentであり、利用可能な出力読み取りモードはaddedcurrent、およびpreviousです。出力読み取りモードに関する詳細は以下のセクションを参照してください。

インクリメンタルトランスフォームの性質上、入力ビューを構築するために、入力データセットの過去のすべてのトランザクションを最後のSNAPSHOTトランザクションからロードします。インクリメンタルトランスフォームで進行的な遅延が見られる場合、インクリメンタル入力データセットでスナップショットビルドを実行することをお勧めします。

IncrementalTransformOutput

transforms.api.IncrementalTransformOutputオブジェクトは、出力データセットの読み取りおよび書き込みモードへのアクセスを提供します。インクリメンタルおよび非インクリメンタルビルドと互換性のあるロジックを書く鍵はmodifyのデフォルト書き込みモードです。書き込みモードは2つあります:

  • modify: このモードは、ビルド中に書き込まれたデータで既存の出力を修正します。たとえば、出力がmodifyモードのときにwrite_dataframe()を呼び出すと、書き込まれたDataFrameが既存の出力に追加されます。
  • replace: このモードは、ビルド中に書き込まれたデータで出力を完全に置き換えます。

トランスフォームがインクリメンタルに実行されると、出力のデフォルト書き込みモードはmodifyに設定されます。トランスフォームが非インクリメンタルに実行されると、出力のデフォルト書き込みモードはreplaceに設定されます。

入力DataFrameのデフォルト読み取りモードはaddedです。デフォルトの入力読み取りモードがaddedであり、デフォルトの出力書き込みモードがmodifyであるため、インクリメンタルおよび非インクリメンタルビルドと互換性のあるロジックを書くことがはるかに簡単になります:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # まだ見たことのない行のみを読み込みます。 new_students_df = students.dataframe() # これはstudents.dataframe('added')と同等です # 非インクリメンタルの場合、すべての行を読み込み、出力を置き換えます。 # インクリメンタルの場合、新しい行のみを読み込み、出力に追加します。 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )

インクリメンタル計算のより複雑なユースケースもあります。正しい書き込みモードを計算し、手動で設定する必要がある場合があります。これは、インクリメンタル出力でset_mode()メソッドを使用して行うことができます。

出力書き込みモードは、transform()デコレーターを使用している場合にのみ手動で設定できます。このデコレーターを使用すると、出力を保存するためにwrite_dataframe()メソッドを明示的に呼び出す前にset_mode()を使用できます。一方、transform_df()およびtransform_pandas()デコレーターは、DataFrame出力を保存するためにwrite_dataframe()およびwrite_pandas()を呼び出します。これにより、使用される書き込みモードはincremental()デコレーターによって決定されます。

Warning

set_mode()を使用する場合、トランスフォームがインクリメンタルまたは非インクリメンタルに実行される場合の両方でこれが有効な動作であることを確認する価値があります。そうでない場合は、is_incrementalプロパティを利用する必要があります。

書き込みモードに加えて、transforms.api.IncrementalTransformOutputは、出力データセットからDataFrameを読み取ることを可能にします。これは、再びオプションの読み取りモードを取るdataframe()メソッドを使用して行うことができます。デフォルトの読み取りモードはcurrentに設定されており、他の利用可能な出力読み取りモードはaddedおよびpreviousです。読み取りモードは、データセットの書き込みモードが設定されているものに応じて異なる動作をします。

Tip

デフォルトの読み取りモードはcurrentですが、ほとんどの場合、実際にはpreviousを使用することをお勧めします。他の読み取りモードは、データセットを書き込んだ後に読み取るために使用する必要があります。

前回の実行からのデータを読み取る、正しい組み合わせ:

トランスフォームはインクリメンタルに実行されている必要があります(ctx.is_incremental is True)で、トランスフォームの前回の出力にアクセスすることができます。トランスフォームが非インクリメンタルに実行されている場合、通常は前回の出力にアクセスできる読み取りモードは前回の出力データの行を返しません。

出力読み取りモード出力書き込みモード新しいデータがまだ書き込まれているか?動作
currentmodifyNoこれらの設定には使用ケースがありません。代わりにpreviousモードを使用してください。
currentmodifyYesdataframe()は、トランスフォームの前回の出力と現在実行中のビルドに書き込まれたデータを返します。
currentreplaceNoこれらの設定には使用ケースがありません。代わりにpreviousモードを使用してください。
currentreplaceYesdataframe()は、現在実行中のビルドに書き込まれたデータを返します。
addedmodify/replaceNoこれらの設定には使用ケースがありません。代わりにpreviousモードを使用してください。
addedmodify/replaceYesdataframe()は、現在実行中のビルドに書き込まれたデータを返します。
previousmodifyYes/Nodataframe()は、トランスフォームの前回の出力を返します。トランスフォームが非インクリメンタルに実行されている場合、previousで読み取るときにスキーマが必要です。
previousreplaceYes/Nodataframe()は、トランスフォームの前回の出力を返します。トランスフォームが非インクリメンタルに実行されている場合、previousで読み取るときにスキーマが必要です。

前回のDataFrameを読み取るときに提供されるスキーマ ↗は、トランスフォームが非インクリメンタルに実行されている場合に空のDataFrameを生成するために使用されます。トランスフォームがインクリメンタルに実行されている場合、このスキーマは最後の出力の実際のスキーマと比較されます。列の型、列のnull可能性、または列の順序が一致しない場合、例外が発生します。列の順序が同じであることを確認するには、次の構造を使用できます:

Copied!
1 2 3 previous = out.dataframe('previous', schema) # schemaはpyspark.sql.types.StructTypeです out.write_dataframe(df.select(schema.fieldNames()))

Foundryは、トランスフォームで使用されるスキーマに関係なく、すべての列をnullableとして保存します。その結果、previousモードで出力から読み取るときに、いくつかのフィールドが非nullableに設定されたスキーマを提供すると、SchemaMismatchErrorでビルドが失敗します。

詳細については、マージと置換の例を参照してください。

現在の実行中に書き込まれたデータを読み取る、正しい組み合わせ:

出力読み取りモード出力書き込みモード新しいデータがまだ書き込まれているか?
currentまたはaddedmodify / replaceYes

意図を明確にするためにaddedを使用することをお勧めします。 現在の変換によって書き込まれたデータを読むことが有益なシナリオは、データをチェックして、チェックが通らなかった場合にビルドを失敗させることです。この方法では、データを再計算したり、Sparkでキャッシュしたりする必要がありません。

IncrementalTransformContext

TransformContextオブジェクトと比較して、IncrementalTransformContextオブジェクトは追加のプロパティis_incrementalを提供します。このプロパティは、トランスフォームがインクリメンタルに実行されている場合にTrueに設定されます。つまり:

  • デフォルトの出力書き込みモードはmodifyに設定され、
  • 入力のデフォルト読み取りモードはaddedに設定されます。

インクリメンタルモードの概要

インクリメンタルデコレーターを使用すると、読み取りモード「previous」を指定することで、トランスフォームの前回の入力および出力にアクセスできます。これにより、現在のビルドを過去のコンテキストに基づいて行うことができます。トランスフォームがスナップショットモードで実行される場合、「previous」DataFrameは空になります。これは、初回の実行であるか、ロジックやデータが大幅に変更されて再計算が必要なためです。

ただし、最も一般的なケースは、入力に「added」モードを使用し、出力に「modify」モードを使用することです。これらのモードはデフォルトで使用されます。これにより、入力データセットから新たに追加された行を取得し、それらを処理して出力データセットに追加することができます。

出力に行を追加する代わりに、出力データセットに既に存在する行を修正したい場合があります。そのためには、「replace」モードを使用します。一般的なシナリオの例で示されています。

インクリメンタル計算の要件

茶色の髪の学生のみを含むように学生をフィルタリングするインクリメンタルトランスフォームを分析してみましょう:

Copied!
1 2 3 4 5 6 7 8 @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

/examples/students_hair_eye_color入力データセットが新しい学生のセットで完全に置き換えられたとします。見ての通り、新しい学生のセットを前回の出力に追加すると、誤った出力データセットが生成されます。これは、incremental()デコレーターがトランスフォームをインクリメンタルに実行しないことを決定する状況の例です。 トランスフォームをインクリメンタルに実行するためには、次の要件を満たす必要があります:

トランスフォームにincremental()デコレーターがあるが、上記の要件のいずれかが満たされない場合、トランスフォームは自動的に非インクリメンタルに実行されます。 これにより、デフォルトの出力書き込みモードがmodifyではなくreplaceに設定され、入力が非インクリメンタルに提示されます。トランスフォームでの出力の読み取りは、以前の履歴にアクセスできないため、空のDataFrameを返します。同様に、入力も非インクリメンタルに提示されます。require_incremental=Trueを設定すると、トランスフォームは非インクリメンタルに実行される代わりに失敗します。

特定の入力を完全に書き換えることができ、トランスフォームをインクリメンタルに実行する能力に影響を与えないことが望ましい場合があります。詳細はスナップショット入力を参照してください。

Tip

トランスフォームをインクリメンタルにのみ実行する(以前に実行されたことがない場合やセマンティックバージョンが変更された場合を除く)ことを強制するには、incrementalデコレーターにrequire_incremental=True引数を渡します。トランスフォームがインクリメンタルに実行できない場合、非インクリメンタルに実行しようとする代わりに故意に失敗します。

追加のみの入力変更

トランスフォームは、すべてのインクリメンタル入力が前回の実行以降に追加ファイルのみを持っていた場合(APPENDまたはUPDATE トランザクションによって)インクリメンタルに実行できます。

逆に、トランスフォームは、いずれかのインクリメンタル入力が

  • 完全に再書き込みされた場合、例:SNAPSHOTトランザクションを持っていた場合、
  • UPDATEまたはDELETEトランザクションを通じて更新または削除されたファイルを持っていた場合、インクリメンタルに実行できません。

たとえば、students_hair_eye_colorの学生リストが完全に変更された場合、フィルタリングされた学生の前回の出力は無効であり、置き換えられる必要があります。

Foundry Retentionからの削除を伴う入力

上流のデータセットが無限に成長し、Foundry Retentionを使用して古い行を削除しても下流の計算のインクリメンタリティに影響を与えないようにしたい場合、そのデータセットに依存するインクリメンタルトランスフォームは、保持された入力を許可するように明示的に設定する必要があります。これは、transforms.api.incrementalデコレーターのallow_retention引数を使用して行うことができます。

  • このフィールドをTrueに設定すると、Foundry Retentionからの削除がインクリメンタリティを維持するかどうかを評価する際に無視されます。これにより、Retentionからのremoved入力はインクリメンタリティを損なわず、唯一の非added入力が保持された行を持つ入力である場合、トランスフォームはインクリメンタルに実行されます。
  • フィールドがFalse(デフォルト)の場合、入力データセットのremovedタイプの変更は、トランスフォームがスナップショットを実行する原因となります。
Copied!
1 2 3 4 5 6 7 8 @incremental(allow_retention=True) @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

上記の例では、/examples/students_hair_eye_colorデータセットでの変更セットにaddedの変更とFoundry Retentionを使用したremovedの変更のみが含まれている場合、トランスフォームはインクリメンタルに実行されます。他の方法で行われたremovedの変更やmodifiedの変更が存在する場合、スナップショットがトリガーされます。

Warning

allow_retention=Trueを指定すると、Foundry Retentionからのremoved変更のみがインクリメンタリティに影響を与えないようにします。入力データセットの他の削除は、インクリメンタル計算の代わりにスナップショットを実行させる原因となります。

スナップショット入力

入力を完全に書き換えることが許可されており、トランスフォームのインクリメンタリティを無効にしないシナリオがあります。たとえば、電話番号の国コードを国にマッピングする単純な参照データセットがあり、これが定期的に書き換えられるとします。このデータセットの変更は、以前の計算の結果を必ずしも無効にしないため、トランスフォームがインクリメンタルに実行されることを妨げるべきではありません。

デフォルトでは、上記のように、トランスフォームは、トランスフォームが最後に実行された以降に入力が完全に書き換えられた場合、インクリメンタルに実行できません。スナップショット入力はこのチェックから除外され、実行間で開始トランザクションが異なることが許可されます。

スナップショット入力は、incremental()デコレーターのsnapshot_inputs引数を使用して構成できます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # これは前回の実行以降に見られていないすべての電話番号です phone_numbers = phone_numbers.dataframe() # これは、以前に見たことがあるかどうかに関係なく、すべての国コードです country_codes = country_codes.dataframe() cond = [phone_numbers.country_code == country_codes.code] output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))

スナップショット入力の動作は、トランスフォームがインクリメンタルに実行される場合と非インクリメンタルに実行される場合で同じです。そのため、addedおよびcurrent読み取りモードは常にデータセット全体を返します。他のすべての読み取りモードは空のデータセットを返します。

スナップショット入力の以前に見たバージョンに制約がないため、トランスフォームをインクリメンタルに実行する能力を保持しながら、スナップショット入力を追加または削除することができます。入力の変更がトランスフォームのセマンティクスを根本的に変更する場合、incremental()デコレーターのsemantic_version引数を更新する価値があるかどうかを検討する価値があります。

入力の変更

既存の入力リストを変更できます。インクリメンタリティは次の場合に保持されます:

  • 新しい入力または新しいスナップショット入力が追加された場合、または
  • 既存の入力または既存のスナップショット入力が削除された場合。 インクリメンタルトランスフォームには少なくとも1つの入力が必要です。

また、非スナップショット入力データセットの開始トランザクションが前回の実行で使用されたものと一致している必要があります。

同じトランスフォームによって最後にビルドされた出力

複数出力のインクリメンタルトランスフォームの場合、各出力の最後にコミットされたトランザクションは同じトランスフォームから生成されたものでなければなりません。

インクリメンタル計算の要件の概要

トランスフォームは、すべてのインクリメンタル入力がファイルのみを追加した場合、またはファイルが削除された場合、allow_retention=TrueでFoundry Retentionを使用して削除された場合にのみ、インクリメンタルに実行できます。スナップショット入力はこのチェックから除外されます。