注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
このガイドの残りの部分では、インクリメンタルビルドと非インクリメンタルビルドについて説明しています。すべてのケースでincremental()
デコレーターが使用されていることを前提としています。そのため、この用語は実際にトランスフォームがインクリメンタルに実行されるかどうかを指します。
incremental()
デコレーターは、トランスフォームの計算関数をインクリメンタル計算を可能にするロジックでラップするために使用できます:
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None students_df = students.dataframe('added') processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
incremental()
デコレーターは、既存のtransform()
, transform_df()
, またはtransform_pandas()
デコレーターを使用するトランスフォームをラップするために使用できます。トランスフォームの計算関数はインクリメンタルおよび非インクリメンタルの両方で実行できることをサポートする必要があります。incremental()
デコレーターは次の2つの重要なことを行います:
incremental()
デコレーターは、以下の要件に従ってトランスフォームがインクリメンタルに実行できるかどうかを決定します。TransformInput
はIncrementalTransformInput
に、TransformOutput
はIncrementalTransformOutput
に、TransformContext
はIncrementalTransformContext
になります。これらのインクリメンタルオブジェクトは、デコレーターでラップされたトランスフォームに渡されます。インクリメンタルデコレーターは6つの引数を取ります:
Copied!1 2 3 4 5 6 7 8
transforms.api.incremental( require_incremental=False, semantic_version=1, snapshot_inputs=None, allow_retention=False, strict_append=False, v2_semantics=False )
require_incremental
引数をTrue
に設定すると、インクリメンタルに実行できない場合にトランスフォームが失敗します。require_incremental=True
でもスナップショットとして実行が許可されるケースは2つあります:
トランスフォームがインクリメンタルに実行されない原因をデバッグするには、ドライバーログで警告transforms.api._incremental: Not running incrementally
を確認してください。
:func:~transforms.api.incremental
デコレーターのsemantic_version
引数は、トランスフォームの次回の実行を非インクリメンタルに強制することができます。
@incremental()
デコレーターのsemantic_version
引数を増やします。
snapshot_inputs
引数は、一部の入力をスナップショット入力として定義することを可能にします。スナップショット入力は、非スナップショット入力とは異なり、更新および削除の変更をサポートします。詳細はスナップショット入力を参照してください。
allow_retention
引数をTrue
に設定すると、Foundry Retentionによる入力および出力データセット内のファイルの削除が許可され、トランスフォームのインクリメンタリティが維持されます。
strict_append
パラメータがTrue
に設定され、入力データセットがインクリメンタルである場合、基礎となるFoundryのトランザクションタイプはAPPEND
に設定され、インクリメンタル書き込みにはAPPEND
トランザクションが使用されます。既存のファイルを上書きしようとすると例外が発生します。
入力データセットがインクリメンタルでない場合、strict_append
はSNAPSHOT
として実行されます。コードがAPPEND
としてインクリメンタルに実行されることを保証するには、require_incremental=True
を使用してください。既存のファイルを上書きしようとすると成功します。
書き込み操作は、Parquetの要約メタデータやHadoopのSUCCESS
ファイルなどの補助ファイルを含め、ファイルを上書きしない場合があります。すべてのFoundry形式のインクリメンタル書き込みは、strict_append
モードをサポートする必要があります。
v2_semantics
パラメータがTrue
に設定されている場合、V2インクリメンタルセマンティクスが使用されます。v2とv1のインクリメンタルセマンティクスの間に動作の違いはなく、すべてのユーザーがこれをTrue
に設定することをお勧めします。v2セマンティクスを使用している場合にのみ、非カタログ入力および出力リソースをインクリメンタルに読み書きすることができます。
前述のように、incremental()
デコレーターでラップされたトランスフォームの計算関数は、インクリメンタルおよび非インクリメンタルの両方で実行できることをサポートする必要があります。デフォルトの読み書きモード(このページの残りの部分で詳しく説明されています)は、この二重ロジック要件をサポートしますが、計算コンテキストのis_incremental
プロパティに基づいて分岐する必要がある場合もあります。
もう1つの重要な点は、transform_df()
またはtransform_pandas()
とincremental()
デコレーターを使用する場合、デフォルトの読み書きモードにのみアクセスできることです。追加された出力行が追加された入力行の関数であるトランスフォームがある場合、これで十分です。ただし、入力の読み取りモードまたは出力の書き込みモードを設定する必要がある結合、集計、または区別などの複雑なロジックを実行する場合は、transform()
とincremental()
デコレーターを使用する必要があります。transform()
とインクリメンタルデコレーターを使用することにより、読み書きモードを設定することができます。
コードリポジトリのプレビューフィーチャーは、常にトランスフォームを非インクリメンタルモードで実行します。これは、require_incremental=True
がincremental()
デコレーターに渡された場合でも同様です。
transforms.api.IncrementalTransformInput
オブジェクトは、オプションの読み取りモードを取るためにdataframe()
メソッドを拡張します。
オプションの入力読み取りモードパラメータは、transform()
デコレーターを使用している場合にのみ利用可能です。transform_df()
およびtransform_pandas()
デコレーターは、引数なしで入力に対してdataframe()
およびpandas()
を呼び出し、PySparkおよびPandas DataFrameオブジェクトを抽出します。これにより、使用される読み取りモードは常にデフォルトのadded
モードになります。
インクリメンタルデコレーターを使用してトランスフォームを定義する場合、トランスフォームがインクリメンタルまたは非インクリメンタルに実行されるかどうかに応じて、読み取りモードの動作は異なります:
読み取りモード | インクリメンタル動作 | 非インクリメンタル動作 |
---|---|---|
added * | 前回のトランスフォームの実行以降に入力に追加された新しい行を含むDataFrame ↗を返します。 | すべての行が未処理と見なされるため、データセット全体を含むDataFrame ↗を返します。 |
previous | 前回のトランスフォーム実行時に与えられた入力全体を含むDataFrame ↗を返します。 | 空のDataFrame ↗を返します。 |
current | 現在の実行のための入力データセット全体を含むDataFrame ↗を返します。 | 現在の実行のための入力データセット全体を含むDataFrame ↗を返します。これはadded と同じです。 |
デフォルトの読み取りモードはadded
です。
トランスフォームがincremental()
とマークされているにもかかわらず、入力がインクリメンタルな方法で扱われることが望ましくない場合があります。これに関する詳細とこれらのタイプの入力に対する読み取りモードの動作の違いについては、スナップショット入力を参照してください。
デフォルトの出力読み取りモードはcurrent
であり、利用可能な出力読み取りモードはadded
、current
、およびprevious
です。出力読み取りモードに関する詳細は以下のセクションを参照してください。
インクリメンタルトランスフォームの性質上、入力ビューを構築するために、入力データセットの過去のすべてのトランザクションを最後のSNAPSHOT
トランザクションからロードします。インクリメンタルトランスフォームで進行的な遅延が見られる場合、インクリメンタル入力データセットでスナップショットビルドを実行することをお勧めします。
transforms.api.IncrementalTransformOutput
オブジェクトは、出力データセットの読み取りおよび書き込みモードへのアクセスを提供します。インクリメンタルおよび非インクリメンタルビルドと互換性のあるロジックを書く鍵はmodify
のデフォルト書き込みモードです。書き込みモードは2つあります:
modify
: このモードは、ビルド中に書き込まれたデータで既存の出力を修正します。たとえば、出力がmodify
モードのときにwrite_dataframe()
を呼び出すと、書き込まれたDataFrame
↗が既存の出力に追加されます。replace
: このモードは、ビルド中に書き込まれたデータで出力を完全に置き換えます。トランスフォームがインクリメンタルに実行されると、出力のデフォルト書き込みモードはmodify
に設定されます。トランスフォームが非インクリメンタルに実行されると、出力のデフォルト書き込みモードはreplace
に設定されます。
入力DataFrameのデフォルト読み取りモードはadded
です。デフォルトの入力読み取りモードがadded
であり、デフォルトの出力書き込みモードがmodify
であるため、インクリメンタルおよび非インクリメンタルビルドと互換性のあるロジックを書くことがはるかに簡単になります:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # まだ見たことのない行のみを読み込みます。 new_students_df = students.dataframe() # これはstudents.dataframe('added')と同等です # 非インクリメンタルの場合、すべての行を読み込み、出力を置き換えます。 # インクリメンタルの場合、新しい行のみを読み込み、出力に追加します。 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )
インクリメンタル計算のより複雑なユースケースもあります。正しい書き込みモードを計算し、手動で設定する必要がある場合があります。これは、インクリメンタル出力でset_mode()
メソッドを使用して行うことができます。
出力書き込みモードは、transform()
デコレーターを使用している場合にのみ手動で設定できます。このデコレーターを使用すると、出力を保存するためにwrite_dataframe()
メソッドを明示的に呼び出す前にset_mode()
を使用できます。一方、transform_df()
およびtransform_pandas()
デコレーターは、DataFrame出力を保存するためにwrite_dataframe()
およびwrite_pandas()
を呼び出します。これにより、使用される書き込みモードはincremental()
デコレーターによって決定されます。
set_mode()
を使用する場合、トランスフォームがインクリメンタルまたは非インクリメンタルに実行される場合の両方でこれが有効な動作であることを確認する価値があります。そうでない場合は、is_incremental
プロパティを利用する必要があります。
書き込みモードに加えて、transforms.api.IncrementalTransformOutput
は、出力データセットからDataFrameを読み取ることを可能にします。これは、再びオプションの読み取りモードを取るdataframe()
メソッドを使用して行うことができます。デフォルトの読み取りモードはcurrent
に設定されており、他の利用可能な出力読み取りモードはadded
およびprevious
です。読み取りモードは、データセットの書き込みモードが設定されているものに応じて異なる動作をします。
デフォルトの読み取りモードはcurrent
ですが、ほとんどの場合、実際にはprevious
を使用することをお勧めします。他の読み取りモードは、データセットを書き込んだ後に読み取るために使用する必要があります。
トランスフォームはインクリメンタルに実行されている必要があります(ctx.is_incremental is True
)で、トランスフォームの前回の出力にアクセスすることができます。トランスフォームが非インクリメンタルに実行されている場合、通常は前回の出力にアクセスできる読み取りモードは前回の出力データの行を返しません。
出力読み取りモード | 出力書き込みモード | 新しいデータがまだ書き込まれているか? | 動作 |
---|---|---|---|
current | modify | No | これらの設定には使用ケースがありません。代わりにpreviousモードを使用してください。 |
current | modify | Yes | dataframe() は、トランスフォームの前回の出力と現在実行中のビルドに書き込まれたデータを返します。 |
current | replace | No | これらの設定には使用ケースがありません。代わりにpreviousモードを使用してください。 |
current | replace | Yes | dataframe() は、現在実行中のビルドに書き込まれたデータを返します。 |
added | modify /replace | No | これらの設定には使用ケースがありません。代わりにpreviousモードを使用してください。 |
added | modify /replace | Yes | dataframe() は、現在実行中のビルドに書き込まれたデータを返します。 |
previous | modify | Yes/No | dataframe() は、トランスフォームの前回の出力を返します。トランスフォームが非インクリメンタルに実行されている場合、previousで読み取るときにスキーマが必要です。 |
previous | replace | Yes/No | dataframe() は、トランスフォームの前回の出力を返します。トランスフォームが非インクリメンタルに実行されている場合、previousで読み取るときにスキーマが必要です。 |
前回のDataFrameを読み取るときに提供されるスキーマ ↗は、トランスフォームが非インクリメンタルに実行されている場合に空のDataFrameを生成するために使用されます。トランスフォームがインクリメンタルに実行されている場合、このスキーマは最後の出力の実際のスキーマと比較されます。列の型、列のnull可能性、または列の順序が一致しない場合、例外が発生します。列の順序が同じであることを確認するには、次の構造を使用できます:
Copied!1 2 3
previous = out.dataframe('previous', schema) # schemaはpyspark.sql.types.StructTypeです out.write_dataframe(df.select(schema.fieldNames()))
Foundryは、トランスフォームで使用されるスキーマに関係なく、すべての列をnullableとして保存します。その結果、previous
モードで出力から読み取るときに、いくつかのフィールドが非nullableに設定されたスキーマを提供すると、SchemaMismatchError
でビルドが失敗します。
詳細については、マージと置換の例を参照してください。
出力読み取りモード | 出力書き込みモード | 新しいデータがまだ書き込まれているか? |
---|---|---|
current またはadded | modify / replace | Yes |
意図を明確にするためにadded
を使用することをお勧めします。
現在の変換によって書き込まれたデータを読むことが有益なシナリオは、データをチェックして、チェックが通らなかった場合にビルドを失敗させることです。この方法では、データを再計算したり、Sparkでキャッシュしたりする必要がありません。
TransformContext
オブジェクトと比較して、IncrementalTransformContext
オブジェクトは追加のプロパティis_incremental
を提供します。このプロパティは、トランスフォームがインクリメンタルに実行されている場合にTrue
に設定されます。つまり:
modify
に設定され、added
に設定されます。インクリメンタルデコレーターを使用すると、読み取りモード「previous」を指定することで、トランスフォームの前回の入力および出力にアクセスできます。これにより、現在のビルドを過去のコンテキストに基づいて行うことができます。トランスフォームがスナップショットモードで実行される場合、「previous」DataFrameは空になります。これは、初回の実行であるか、ロジックやデータが大幅に変更されて再計算が必要なためです。
ただし、最も一般的なケースは、入力に「added」モードを使用し、出力に「modify」モードを使用することです。これらのモードはデフォルトで使用されます。これにより、入力データセットから新たに追加された行を取得し、それらを処理して出力データセットに追加することができます。
出力に行を追加する代わりに、出力データセットに既に存在する行を修正したい場合があります。そのためには、「replace」モードを使用します。一般的なシナリオの例で示されています。
茶色の髪の学生のみを含むように学生をフィルタリングするインクリメンタルトランスフォームを分析してみましょう:
Copied!1 2 3 4 5 6 7 8
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
/examples/students_hair_eye_color
入力データセットが新しい学生のセットで完全に置き換えられたとします。見ての通り、新しい学生のセットを前回の出力に追加すると、誤った出力データセットが生成されます。これは、incremental()
デコレーターがトランスフォームをインクリメンタルに実行しないことを決定する状況の例です。
トランスフォームをインクリメンタルに実行するためには、次の要件を満たす必要があります:
トランスフォームにincremental()
デコレーターがあるが、上記の要件のいずれかが満たされない場合、トランスフォームは自動的に非インクリメンタルに実行されます。
これにより、デフォルトの出力書き込みモードがmodify
ではなくreplace
に設定され、入力が非インクリメンタルに提示されます。トランスフォームでの出力の読み取りは、以前の履歴にアクセスできないため、空のDataFrameを返します。同様に、入力も非インクリメンタルに提示されます。require_incremental=True
を設定すると、トランスフォームは非インクリメンタルに実行される代わりに失敗します。
特定の入力を完全に書き換えることができ、トランスフォームをインクリメンタルに実行する能力に影響を与えないことが望ましい場合があります。詳細はスナップショット入力を参照してください。
トランスフォームをインクリメンタルにのみ実行する(以前に実行されたことがない場合やセマンティックバージョンが変更された場合を除く)ことを強制するには、incremental
デコレーターにrequire_incremental=True
引数を渡します。トランスフォームがインクリメンタルに実行できない場合、非インクリメンタルに実行しようとする代わりに故意に失敗します。
トランスフォームは、すべてのインクリメンタル入力が前回の実行以降に追加ファイルのみを持っていた場合(APPEND
またはUPDATE
トランザクションによって)インクリメンタルに実行できます。
逆に、トランスフォームは、いずれかのインクリメンタル入力が
SNAPSHOT
トランザクションを持っていた場合、UPDATE
またはDELETE
トランザクションを通じて更新または削除されたファイルを持っていた場合、インクリメンタルに実行できません。たとえば、students_hair_eye_color
の学生リストが完全に変更された場合、フィルタリングされた学生の前回の出力は無効であり、置き換えられる必要があります。
上流のデータセットが無限に成長し、Foundry Retentionを使用して古い行を削除しても下流の計算のインクリメンタリティに影響を与えないようにしたい場合、そのデータセットに依存するインクリメンタルトランスフォームは、保持された入力を許可するように明示的に設定する必要があります。これは、transforms.api.incremental
デコレーターのallow_retention
引数を使用して行うことができます。
True
に設定すると、Foundry Retentionからの削除がインクリメンタリティを維持するかどうかを評価する際に無視されます。これにより、Retentionからのremoved
入力はインクリメンタリティを損なわず、唯一の非added
入力が保持された行を持つ入力である場合、トランスフォームはインクリメンタルに実行されます。False
(デフォルト)の場合、入力データセットのremoved
タイプの変更は、トランスフォームがスナップショットを実行する原因となります。Copied!1 2 3 4 5 6 7 8
@incremental(allow_retention=True) @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.dataframe() processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
上記の例では、/examples/students_hair_eye_color
データセットでの変更セットにadded
の変更とFoundry Retentionを使用したremoved
の変更のみが含まれている場合、トランスフォームはインクリメンタルに実行されます。他の方法で行われたremoved
の変更やmodified
の変更が存在する場合、スナップショットがトリガーされます。
allow_retention=True
を指定すると、Foundry Retentionからのremoved
変更のみがインクリメンタリティに影響を与えないようにします。入力データセットの他の削除は、インクリメンタル計算の代わりにスナップショットを実行させる原因となります。
入力を完全に書き換えることが許可されており、トランスフォームのインクリメンタリティを無効にしないシナリオがあります。たとえば、電話番号の国コードを国にマッピングする単純な参照データセットがあり、これが定期的に書き換えられるとします。このデータセットの変更は、以前の計算の結果を必ずしも無効にしないため、トランスフォームがインクリメンタルに実行されることを妨げるべきではありません。
デフォルトでは、上記のように、トランスフォームは、トランスフォームが最後に実行された以降に入力が完全に書き換えられた場合、インクリメンタルに実行できません。スナップショット入力はこのチェックから除外され、実行間で開始トランザクションが異なることが許可されます。
スナップショット入力は、incremental()
デコレーターのsnapshot_inputs
引数を使用して構成できます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
@incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # これは前回の実行以降に見られていないすべての電話番号です phone_numbers = phone_numbers.dataframe() # これは、以前に見たことがあるかどうかに関係なく、すべての国コードです country_codes = country_codes.dataframe() cond = [phone_numbers.country_code == country_codes.code] output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))
スナップショット入力の動作は、トランスフォームがインクリメンタルに実行される場合と非インクリメンタルに実行される場合で同じです。そのため、added
およびcurrent
読み取りモードは常にデータセット全体を返します。他のすべての読み取りモードは空のデータセットを返します。
スナップショット入力の以前に見たバージョンに制約がないため、トランスフォームをインクリメンタルに実行する能力を保持しながら、スナップショット入力を追加または削除することができます。入力の変更がトランスフォームのセマンティクスを根本的に変更する場合、incremental()
デコレーターのsemantic_version
引数を更新する価値があるかどうかを検討する価値があります。
既存の入力リストを変更できます。インクリメンタリティは次の場合に保持されます:
また、非スナップショット入力データセットの開始トランザクションが前回の実行で使用されたものと一致している必要があります。
複数出力のインクリメンタルトランスフォームの場合、各出力の最後にコミットされたトランザクションは同じトランスフォームから生成されたものでなければなりません。
トランスフォームは、すべてのインクリメンタル入力がファイルのみを追加した場合、またはファイルが削除された場合、allow_retention=True
でFoundry Retentionを使用して削除された場合にのみ、インクリメンタルに実行できます。スナップショット入力はこのチェックから除外されます。