注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
このガイドの残りの部分では、インクリメンタルビルドと非インクリメンタルビルドについて参照します。すべての場合で、 incremental()
デコレーターが使用されていると仮定されます。したがって、この用語は、変換が実際にインクリメンタルに実行されるかどうかを指します。
incremental()
デコレーターは、インクリメンタル計算を有効にするロジックで変換の計算機能をラップするために使用できます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # まだ見ていない行のみを読みます。 new_students_df = students.dataframe() # これは students.dataframe('added') と同等です # 非インクリメンタルの場合、すべての行を読み出力を置き換えます。 # インクリメンタルの場合、新しい行のみを読み、それらを出力に追加します。 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') # ヘアカラーがブラウンの生徒だけをフィルタリングします )
Copied!1 2 3 4 5
# schemaはpyspark.sql.types.StructTypeです previous = out.dataframe('previous', schema) # 以前のデータフレームを取得 # データフレームを選択して書き込みます out.write_dataframe(df.select(schema.fieldNames()))
Foundry は、ユーザーの変換で使用されるスキーマに関係なく、すべての行を nullable として保存します。その結果、一部のフィールドが non-nullable に設定されたスキーマを提供すると、 previous
モードで出力から読み取る際に SchemaMismatchError
が発生し、ビルドが失敗します。
詳しくは、Merge and replace の例をご覧ください。
Output Read Mode | Output Write Mode | 新しいデータがすでに書き込まれていますか? |
---|---|---|
current または added | modify / replace | はい |
added
を使用することをおすすめします。これによりユーザーの意図が明確になります。
現在の変換によって書き込まれたデータを読み取ることが有益なシナリオは、データに対してチェックを実行し、チェックがパスしない場合はビルドを失敗させることです。これにより、データを再計算する必要も、チェックを実行するために Spark にキャッシュする必要もありません。
TransformContext
オブジェクトと比較して、 IncrementalTransformContext
オブジェクトは追加のプロパティ: is_incremental
を提供します。このプロパティは、変換が増分的に実行される場合に True
に設定され、次の意味を持ちます:
modify
に設定されている、およびadded
に設定されています。増分デコレーターを使用すると、読み取りモード “previous” を指定することで変換の前回の入力と出力にアクセスできます。これにより、現在のビルドを歴史的なコンテキストに基づいて行うことができます。変換がスナップショットモードで実行されると、“previous” データフレームは空になります。これは、これが最初の実行であるか、ロジックやデータが大幅に変更されて再計算が必要になったためです。
しかし、最も一般的なケースは、入力に “added” モードを使用し、出力に “modify” モードを使用することです。これらのモードはデフォルトで使用されます。これにより、新しく追加された行を入力データセットから取得し、それらを処理し、出力データセットに追加することができます。
出力に行を追加する代わりに、出力データセットにすでに存在する一部の既存の行を変更することがあります。そのためには、一般的なシナリオの例で示されているように、“replace” モードを使用します。
茶髪の学生のみを含むように学生をフィルター処理する増分変換を分析してみましょう:
例えば、/examples/students_hair_eye_color
入力データセットが新しい生徒のセットで完全に置き換えられたとします。ここで、新しい生徒のセットを前の出力結果に追加すると、出力データセットが誤っていることがわかります。これは、incremental()
デコレータがトランスフォームをインクリメンタルに実行しないことを決定する状況の例です。
トランスフォームがインクリメンタルに実行されるためには、以下の要件が満たされている必要があります。
トランスフォームに incremental()
デコレータがあるが、上記のいずれかの要件が満たされていない場合、トランスフォームは自動的に非インクリメンタルに実行されます。
これは、デフォルトの出力書き込みモードが modify
から replace
に設定され、入力が非インクリメンタルに提示されることを意味します。また、トランスフォームで出力を読み取ると、空のデータフレームが返されるため、前の履歴にアクセスできません。同様に、入力も非インクリメンタルに提示されます。require_incremental=True
を設定すると、トランスフォームは非インクリメンタルに実行されるのではなく、失敗します。
特定の入力を完全に書き換えても、トランスフォームをインクリメンタルに実行できるようにすることがよく望まれます。詳細については、スナップショット入力を参照してください。
require_incremental=True
引数を incremental
デコレータに渡すことで、トランスフォームをインクリメンタルに実行することができます(ただし、これまでに実行されたことがない場合やセマンティックバージョンが上がった場合を除く)。トランスフォームがインクリメンタルに実行できない場合は、非インクリメンタルに実行しようとせずに意図的に失敗します。
トランスフォームは、インクリメンタル入力がすべて、最後の実行以降にファイルが追加された状態(APPEND
または UPDATE
トランザクションを使用)である場合、インクリメンタルに実行できます。
逆に、インクリメンタル入力のいずれかが以下の状態の場合、トランスフォームはインクリメンタルに実行できません。
SNAPSHOT
トランザクションがあった場合UPDATE
または DELETE
トランザクションがあった場合。例えば、students_hair_eye_color
の生徒のリストが完全に変更された場合、フィルター処理された生徒の前の出力は無効であり、置き換える必要があります。
上流データセットが無限に増え続ける場合、古い行を削除することができます(Foundry Retention を使用)が、下流の計算のインクリメンタリティに影響を与えないように、そのデータセットに依存するインクリメンタルトランスフォームは、明示的に retained 入力を許可するように設定する必要があります。これは、transforms.api.incremental
デコレータの allow_retention
引数を使用して行うことができます。
True
に設定されている場合、Foundry Retention からの削除がインクリメンタリティの維持を評価する際に無視されます。つまり、Retention からの removed
入力はインクリメンタリティを損なわず、唯一の non-added
入力が retained 行の入力である場合でも、トランスフォームはインクリメンタルに実行されます。False
(デフォルト)の場合、入力データセットの任意の removed
タイプの変更によって、トランスフォームはスナップショットを実行します。
上記の例では、データセット /examples/students_hair_eye_color
の変更セット後に変換が実行され、その変更セットには Foundry Retention を使用した 追加された
変更と 削除された
変更のみが含まれている場合、変換は増分的に実行されます。他の方法で行われた 削除された
変更や 修正された
変更が存在する場合、スナップショットがトリガーされます。allow_retention=True
を指定しても、Foundry Retention から来る 削除された
変更の増分性への影響だけが防止されます。入力データセットの他の削除は依然として変換が増分計算の代わりにスナップショットを実行する原因となります。
入力が全面的に書き換えられることが許されるシナリオがあり、その場合でも変換の増分性は無効になりません。例えば、電話番号の国コードを国にマッピングする簡単な参照データセットを持っていて、これが定期的に書き換えられるとします。このデータセットへの変更は必ずしも以前の計算の結果を無効にするわけではないため、変換が増分的に実行されるのを防ぐべきではありません。
デフォルトでは、上記で説明したように、変換は最後に実行された後に任意の入力が全面的に書き換えられた場合、増分的に実行することはできません。スナップショット入力はこのチェックから除外され、実行間での開始トランザクションが異なってもよいとされます。
スナップショット入力は、incremental()
デコレーターの snapshot_inputs
引数を使用して設定することができます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # 前回の実行以降に見つかったすべての新しい電話番号です phone_numbers = phone_numbers.dataframe() # 以前に見たものに関係なく、すべての国コードです country_codes = country_codes.dataframe() # 電話番号の国コードが国コードのコードと一致する条件です cond = [phone_numbers.country_code == country_codes.code] # 左外部結合を使用して、電話番号と国コードを結合し、結果を出力します output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))
スナップショット入力の動作は、変換が増分的に実行される場合と非増分的に実行される場合とで同一です。そのため、added
および current
読み取りモードは常にデータセット全体を返します。他のすべての読み取りモードは空のデータセットを返します。
スナップショット入力の以前に見たバージョンに関して制約がないため、スナップショット入力を追加または削除しながら、変換を増分的に実行する能力を保持することが可能です。入力の変更が変換のセマンティクスを根本的に変更する場合、incremental()
デコレーターの semantic_version
引数を更新するかどうかを再確認する価値があります。
既存の入力リストは変更可能です。次の場合に増分性が保持されます:
また、非スナップショット入力データセットの各開始トランザクションが、前回の実行で使用されたものと一致する必要があります。
複数出力の増分的な変換の場合、それぞれの出力に対する最後にコミットされたトランザクションは、同じ変換から生成されていなければなりません。
変換は、すべての増分入力がファイルの追加のみであった場合、またはファイルが削除された場合には、それらのファイルが Foundry Retention で allow_retention=True
を使用して削除された場合にのみ、増分的に実行できます。スナップショット入力はこのチェックから除外されます。