注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

インクリメンタル変換リファレンス

インクリメンタルデコレーター

Tip

このガイドの残りの部分では、インクリメンタルビルドと非インクリメンタルビルドについて参照します。すべての場合で、 incremental() デコレーターが使用されていると仮定されます。したがって、この用語は、変換が実際にインクリメンタルに実行されるかどうかを指します。

incremental() デコレーターは、インクリメンタル計算を有効にするロジックで変換の計算機能をラップするために使用できます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # まだ見ていない行のみを読みます。 new_students_df = students.dataframe() # これは students.dataframe('added') と同等です # 非インクリメンタルの場合、すべての行を読み出力を置き換えます。 # インクリメンタルの場合、新しい行のみを読み、それらを出力に追加します。 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') # ヘアカラーがブラウンの生徒だけをフィルタリングします )
Copied!
1 2 3 4 5 # schemaはpyspark.sql.types.StructTypeです previous = out.dataframe('previous', schema) # 以前のデータフレームを取得 # データフレームを選択して書き込みます out.write_dataframe(df.select(schema.fieldNames()))

Foundry は、ユーザーの変換で使用されるスキーマに関係なく、すべての行を nullable として保存します。その結果、一部のフィールドが non-nullable に設定されたスキーマを提供すると、 previous モードで出力から読み取る際に SchemaMismatchError が発生し、ビルドが失敗します。

詳しくは、Merge and replace の例をご覧ください。

現在のランで書き込まれたデータを読み取る、有効な組み合わせ:

Output Read ModeOutput Write Mode新しいデータがすでに書き込まれていますか?
current または addedmodify / replaceはい

added を使用することをおすすめします。これによりユーザーの意図が明確になります。 現在の変換によって書き込まれたデータを読み取ることが有益なシナリオは、データに対してチェックを実行し、チェックがパスしない場合はビルドを失敗させることです。これにより、データを再計算する必要も、チェックを実行するために Spark にキャッシュする必要もありません。

IncrementalTransformContext

TransformContext オブジェクトと比較して、 IncrementalTransformContext オブジェクトは追加のプロパティ: is_incremental を提供します。このプロパティは、変換が増分的に実行される場合に True に設定され、次の意味を持ちます:

  • デフォルトの出力書き込みモードが modify に設定されている、および
  • 入力のデフォルトの読み取りモードが added に設定されています。

増分モードの概要

増分デコレーターを使用すると、読み取りモード “previous” を指定することで変換の前回の入力と出力にアクセスできます。これにより、現在のビルドを歴史的なコンテキストに基づいて行うことができます。変換がスナップショットモードで実行されると、“previous” データフレームは空になります。これは、これが最初の実行であるか、ロジックやデータが大幅に変更されて再計算が必要になったためです。

しかし、最も一般的なケースは、入力に “added” モードを使用し、出力に “modify” モードを使用することです。これらのモードはデフォルトで使用されます。これにより、新しく追加された行を入力データセットから取得し、それらを処理し、出力データセットに追加することができます。

出力に行を追加する代わりに、出力データセットにすでに存在する一部の既存の行を変更することがあります。そのためには、一般的なシナリオの例で示されているように、“replace” モードを使用します。

増分計算の要件

茶髪の学生のみを含むように学生をフィルター処理する増分変換を分析してみましょう: 例えば、/examples/students_hair_eye_color 入力データセットが新しい生徒のセットで完全に置き換えられたとします。ここで、新しい生徒のセットを前の出力結果に追加すると、出力データセットが誤っていることがわかります。これは、incremental()デコレータがトランスフォームをインクリメンタルに実行しないことを決定する状況の例です。 トランスフォームがインクリメンタルに実行されるためには、以下の要件が満たされている必要があります。

トランスフォームに incremental() デコレータがあるが、上記のいずれかの要件が満たされていない場合、トランスフォームは自動的に非インクリメンタルに実行されます。 これは、デフォルトの出力書き込みモードが modify から replace に設定され、入力が非インクリメンタルに提示されることを意味します。また、トランスフォームで出力を読み取ると、空のデータフレームが返されるため、前の履歴にアクセスできません。同様に、入力も非インクリメンタルに提示されます。require_incremental=True を設定すると、トランスフォームは非インクリメンタルに実行されるのではなく、失敗します。

特定の入力を完全に書き換えても、トランスフォームをインクリメンタルに実行できるようにすることがよく望まれます。詳細については、スナップショット入力を参照してください。

Tip

require_incremental=True 引数を incremental デコレータに渡すことで、トランスフォームをインクリメンタルに実行することができます(ただし、これまでに実行されたことがない場合やセマンティックバージョンが上がった場合を除く)。トランスフォームがインクリメンタルに実行できない場合は、非インクリメンタルに実行しようとせずに意図的に失敗します。

追加のみの入力変更

トランスフォームは、インクリメンタル入力がすべて、最後の実行以降にファイルが追加された状態(APPEND または UPDATE トランザクションを使用)である場合、インクリメンタルに実行できます。

逆に、インクリメンタル入力のいずれかが以下の状態の場合、トランスフォームはインクリメンタルに実行できません。

  • 完全に書き換えられた状態であること、例えば、SNAPSHOT トランザクションがあった場合
  • 更新されたファイルや削除されたファイルがある状態であること、例えば、UPDATE または DELETE トランザクションがあった場合。

例えば、students_hair_eye_color の生徒のリストが完全に変更された場合、フィルター処理された生徒の前の出力は無効であり、置き換える必要があります。

Foundry Retention からの削除がある入力

上流データセットが無限に増え続ける場合、古い行を削除することができます(Foundry Retention を使用)が、下流の計算のインクリメンタリティに影響を与えないように、そのデータセットに依存するインクリメンタルトランスフォームは、明示的に retained 入力を許可するように設定する必要があります。これは、transforms.api.incremental デコレータの allow_retention 引数を使用して行うことができます。

  • このフィールドが True に設定されている場合、Foundry Retention からの削除がインクリメンタリティの維持を評価する際に無視されます。つまり、Retention からの removed 入力はインクリメンタリティを損なわず、唯一の non-added 入力が retained 行の入力である場合でも、トランスフォームはインクリメンタルに実行されます。
  • フィールドが False(デフォルト)の場合、入力データセットの任意の removed タイプの変更によって、トランスフォームはスナップショットを実行します。 上記の例では、データセット /examples/students_hair_eye_color の変更セット後に変換が実行され、その変更セットには Foundry Retention を使用した 追加された 変更と 削除された 変更のみが含まれている場合、変換は増分的に実行されます。他の方法で行われた 削除された 変更や 修正された 変更が存在する場合、スナップショットがトリガーされます。
警告

allow_retention=True を指定しても、Foundry Retention から来る 削除された 変更の増分性への影響だけが防止されます。入力データセットの他の削除は依然として変換が増分計算の代わりにスナップショットを実行する原因となります。

スナップショット入力

入力が全面的に書き換えられることが許されるシナリオがあり、その場合でも変換の増分性は無効になりません。例えば、電話番号の国コードを国にマッピングする簡単な参照データセットを持っていて、これが定期的に書き換えられるとします。このデータセットへの変更は必ずしも以前の計算の結果を無効にするわけではないため、変換が増分的に実行されるのを防ぐべきではありません。

デフォルトでは、上記で説明したように、変換は最後に実行された後に任意の入力が全面的に書き換えられた場合、増分的に実行することはできません。スナップショット入力はこのチェックから除外され、実行間での開始トランザクションが異なってもよいとされます。

スナップショット入力は、incremental() デコレーターの snapshot_inputs 引数を使用して設定することができます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # 前回の実行以降に見つかったすべての新しい電話番号です phone_numbers = phone_numbers.dataframe() # 以前に見たものに関係なく、すべての国コードです country_codes = country_codes.dataframe() # 電話番号の国コードが国コードのコードと一致する条件です cond = [phone_numbers.country_code == country_codes.code] # 左外部結合を使用して、電話番号と国コードを結合し、結果を出力します output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))

スナップショット入力の動作は、変換が増分的に実行される場合と非増分的に実行される場合とで同一です。そのため、added および current 読み取りモードは常にデータセット全体を返します。他のすべての読み取りモードは空のデータセットを返します。

スナップショット入力の以前に見たバージョンに関して制約がないため、スナップショット入力を追加または削除しながら、変換を増分的に実行する能力を保持することが可能です。入力の変更が変換のセマンティクスを根本的に変更する場合、incremental() デコレーターの semantic_version 引数を更新するかどうかを再確認する価値があります。

入力への変更

既存の入力リストは変更可能です。次の場合に増分性が保持されます:

  • 新しい入力または新しいスナップショット入力が追加された場合、または
  • 既存の入力または既存のスナップショット入力が削除された場合。 増分的な変換には少なくとも1つの入力が必要であることに注意してください。

また、非スナップショット入力データセットの各開始トランザクションが、前回の実行で使用されたものと一致する必要があります。

同じ変換によって最後に構築された出力

複数出力の増分的な変換の場合、それぞれの出力に対する最後にコミットされたトランザクションは、同じ変換から生成されていなければなりません。

増分計算の要件の要約

変換は、すべての増分入力がファイルの追加のみであった場合、またはファイルが削除された場合には、それらのファイルが Foundry Retention で allow_retention=True を使用して削除された場合にのみ、増分的に実行できます。スナップショット入力はこのチェックから除外されます。