注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
このガイドの残りの部分では、インクリメンタルビルドと非インクリメンタルビルドについて参照します。すべての場合で、 incremental()
デコレーターが使用されていると仮定されます。したがって、この用語は、変換が実際にインクリメンタルに実行されるかどうかを指します。
incremental()
デコレーターは、インクリメンタル計算を有効にするロジックで変換の計算機能をラップするために使用できます。
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), # studentsというInputパスを設定します processed=Output('/examples/hair_eye_color_processed') # processedというOutputパスを設定します ) def filter_hair_color(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None students_df = students.dataframe('added') # 追加されたデータをデータフレームとして取得します processed.write_dataframe(students_df.filter(students_df.hair == 'Brown')) # 髪の色がブラウンの学生のみをフィルタリングして保存します
incremental()
デコレータは、transform()
, transform_df()
, transform_pandas()
デコレータを使用する既存の変換に対して使用できます。ただし、ユーザーの変換のための計算関数は、インクリメンタルかつ非インクリメンタルで実行できるようにサポートされている必要があります。「incremental()」デコレータは、2つの主要なことを行います。
TransformInput
はIncrementalTransformInput
になり、TransformOutput
はIncrementalTransformOutput
になり、TransformContext
はIncrementalTransformContext
になります。これらのインクリメンタルオブジェクトは、デコレータでラップされた変換に渡されます。インクリメンタルデコレータは5つの引数を取ります。
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# transforms.api.incrementalは、データ変換の実行を制御するための関数です。 transforms.api.incremental( # require_incrementalは、インクリメンタルな実行を必要とするかどうかを指定します。Falseに設定すると、常にフルロードが強制されます。 require_incremental=False, # semantic_versionは、変換のバージョンを指定します。ここでは1を指定しています。 semantic_version=1, # snapshot_inputsは、スナップショット入力を指定します。Noneを指定すると、すべての入力がスナップショットになります。 snapshot_inputs=None, # allow_retentionは、保存ポリシーを許可するかどうかを指定します。Falseに設定すると、保存ポリシーは無視されます。 allow_retention=False, # strict_appendは、厳密な追加を行うかどうかを指定します。Falseに設定すると、レコードは変更されずに追加されます。 strict_append=False)
require_incremental
引数を True
に設定すると、インクリメンタル実行ができない場合、変換が失敗します。require_incremental=True
の場合でも、スナップショットとして実行が許可されるケースが 2 つあります。
インクリメンタル実行ができない原因を調べるには、ドライバーログの警告transforms.api._incremental: Not running incrementally
を確認してください。
:func:~transforms.api.incremental
デコレーターの semantic_version
引数は、次の変換の実行を非インクリメンタルにすることができます。
@incremental()
デコレーターの semantic_version
引数を上げます。
snapshot_inputs
引数は、スナップショット入力としていくつかの入力を定義することができ、これらの入力は、非スナップショット入力とは異なり、更新や削除の変更をサポートします。詳細については、スナップショット入力 を参照してください。
allow_retention
引数を true
に設定すると、Foundry Retention による入力および出力データセット内のファイルの削除が許可され、変換のインクリメンタル性が維持されます。
strict_append
パラメーターが true
に設定されている場合、基礎となる Foundry トランザクション のタイプは APPEND
に設定され、インクリメンタルな書き込みに APPEND
トランザクションが使用されます。書き込み操作では、Parquet のサマリーメタデータや Hadoop の SUCCESS ファイルなどの補助ファイルも含め、どのファイルも上書きされないことに注意してください。すべての Foundry 形式は、このモードをサポートするはずです。
上記で説明したように、incremental()
デコレーターでラップされた変換の計算関数は、インクリメンタル実行と非インクリメンタル実行の両方をサポートする必要があります。デフォルトの読み込みモードと書き込みモード(このページの後半で詳しく説明されています)は、この二重ロジック要件をサポートするのに役立ちますが、計算コンテキスト の is_incremental
プロパティに基づいて分岐する必要があるかもしれません。
もうひとつの重要な点は、incremental()
デコレーターを transform_df()
または transform_pandas()
と一緒に使用すると、デフォルトの読み書きモードのみが利用可能になることです。これは、追加された入力行のみに関数の追加出力行がある変換(追加の例 を参照)に対しては十分です。ただし、入力読み込みモードや出力書き込みモードを設定する必要があるより複雑なロジック(結合、集計、重複排除など)を実行する変換がある場合は、incremental()
デコレーターを transform()
と一緒に使用する必要があります。インクリメンタルデコレーターを transform()
と一緒に使用することで、読み書きモードを設定できます。
コードリポジトリのプレビュー機能では、すべての変換が非インクリメンタルモードで実行されることに注意してください。これは、incremental()
デコレーターに require_incremental=True
が渡された場合でも同様です。
transforms.api.IncrementalTransformInput
オブジェクトは、オプションの読み込みモードで dataframe()
メソッドを拡張します。
オプションの入力読み込みモードパラメーターは、transform()
デコレーターを使用している場合にのみ利用できます。transform_df()
および transform_pandas()
デコレーターは、引数なしで入力の dataframe()
および pandas()
を呼び出し、PySpark および Pandas DataFrame オブジェクトを抽出します。これは、読み込みモードが常にデフォルトの added
モードであることを意味します。
インクリメンタルデコレーターを使用して変換を定義する場合、変換がインクリメンタル実行されるか非インクリメンタル実行されるかによって、読み込みモードの動作が異なります。
読み込みモード | インクリメンタル動作 | 非インクリメンタル動作 |
---|---|---|
added * | 変換が前回実行されてから追加された入力の新しい行を含む DataFrame を返します。 | すべての行が未見と見なされるため、データセット全体を含む DataFrame を返します。 |
previous | 変換が前回実行されたときに与えられた入力全体を含む DataFrame を返します。 | 空の DataFrame を返します。 |
current | 現在の実行の入力データセット全体を含む DataFrame を返します。 | 現在の実行の入力データセット全体を含む DataFrame を返します。これは added と同じになります。 |
デフォルトの読み込みモードは added
です。
インクリメンタル方式で扱われることが望ましくない入力がある場合がありますが、変換が incremental()
としてマークされている場合があります。このような入力の詳細や、読み込みモードの動作がこれらのタイプの入力にどのように異なるかについては、スナップショット入力 を参照してください。
出力のデフォルトの読み込みモードは current
であり、利用可能な出力の読み込みモードは added
、current
、および previous
です。出力の読み込みモードの詳細については、以下のセクションを参照してください。
インクリメンタル変換の性質上、前回の SNAPSHOT
トランザクションから入力データセットの過去のすべてのトランザクションを読み込んで入力ビューを構築します。インクリメンタル変換で段階的な遅延が見られるようになった場合は、インクリメンタル入力データセットで SNAPSHOT ビルドを実行することをお勧めします。
transforms.api.IncrementalTransformOutput
オブジェクトは、出力データセットの読み込みモードと書き込みモードにアクセスできます。出力データセットのデフォルトの書き込みモードである modify
を使用して、インクリメンタルビルドと非インクリメンタルビルドの両方と互換性のあるロジックを書くことが重要です。書き込みモードは 2 つあります。
modify
:このモードでは、ビルド中に書き込まれたデータで既存の出力を変更します。たとえば、出力が modify
モードにある場合、write_dataframe()
を呼び出すと、書き込まれた DataFrame
が既存の出力に追加されます。replace
:このモードでは、ビルド中に書き込まれたデータで出力を完全に置き換えます。変換がインクリメンタルに実行されるということは、出力のデフォルトの書き込みモードが modify
に設定されていることを意味します。同様に、変換が非インクリメンタルに実行されるということは、出力のデフォルトの書き込みモードが replace
に設定されていることを意味します。
入力 DataFrame のデフォルトの読み込みモードが added
であることを思い出してください。added
のデフォルトの入力読み込みモードと出力のデフォルトの書き込みモードである modify
のおかげで、インクリメンタルビルドと非インクリメンタルビルドの両方と互換性のあるロジックを書くことがはるかに簡単になります。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # まだ見ていない行のみを読みます。 new_students_df = students.dataframe() # これは students.dataframe('added') と同等です # 非インクリメンタルの場合、すべての行を読み出力を置き換えます。 # インクリメンタルの場合、新しい行のみを読み、それらを出力に追加します。 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') # ヘアカラーがブラウンの生徒だけをフィルタリングします )
増分計算のより複雑な使用例では、正しい書き込みモードを計算し、手動で設定する必要があるかもしれません。これは、増分出力で set_mode()
メソッドを使用して行うことができます。
出力書き込みモードは、transform()
デコレーターを使用している場合に限り、手動で設定することができます。このデコレーターを使用すると、出力を保存する前に set_mode()
を使用できます。一方、transform_df()
および transform_pandas()
デコレーターは、write_dataframe()
および write_pandas()
を呼び出して DataFrame 出力を保存します。これは、使用される書き込みモードが incremental()
デコレーターによって決定されることを意味します。
set_mode()
を使用する場合、変換が増分または非増分で実行される場合にこれが有効な動作であることを確認することが重要です。そうでない場合は、is_incremental
プロパティを使用する必要があります。
書き込みモードに加えて、transforms.api.IncrementalTransformOutput
では、出力データセットから DataFrame を読み込むことができます。これは、オプションの読み込みモードを引数に取る dataframe()
メソッドを使用して行うことができます。デフォルトの読み込みモードは current
に設定されており、他の利用可能な出力読み込みモードは added
および previous
です。読み込みモードは、データセットの書き込みモードが何に設定されているかによって異なる動作をします。
デフォルトの読み込みモードは current
ですが、ほとんどの場合、実際には previous
を使用したいです。他の読み込みモードは、それに書き込んだ後のデータセットを読み込むために使用されるべきです。
前回の出力からデータを読み込むには、変換を増分モードで実行する必要があります(ctx.is_incremental is True
)。そうでない場合、dataframe は空になります。
出力読み込みモード | 出力書き込みモード | 新しいデータがすでに書き込まれていますか? | 動作 |
---|---|---|---|
current | modify | いいえ | dataframe() は変換の前回の出力を返します。 |
current | modify | はい | dataframe() は変換の前回の出力と現在実行中のビルドで出力に書き込まれたデータを返します。 |
current | replace | いいえ | これらの設定は無効であり、予期しない動作を引き起こす可能性があります。異なるスキーマを持つ新しい入力で前回の出力をマージして置き換えたい場合は、スキーマ変更とマージおよび置換の例を参照してください。 |
current | replace | はい | dataframe() は現在実行中のビルドで出力に書き込まれたデータを返します。 |
added | modify /replace | いいえ | これらの設定の使用例はありません。代わりに previous モードを使用してください。 |
added | modify /replace | はい | dataframe() は現在実行中のビルドで出力に書き込まれたデータを返します。 |
previous | modify | はい/いいえ | dataframe() は変換の前回の出力を返します。スキーマは previous で読み込む際の必須フィールドです。 |
previous | replace | はい/いいえ | dataframe() は変換の前回の出力を返します。スキーマは previous で読み込む際の必須フィールドです。 |
前回の dataframe を取得するために current
を使用する場合、schema
を提供する必要はありません。これは、current がすでに構築されているはずの出力のスキーマを使用するからです。
しかし、current
モードは previous
よりも脆弱です。current
モードは次の場合に失敗します:
dataframe
を呼び出す前に write_mode
を modify
にオーバーライドしなかった場合DataFrame
を構築できない場合。前回の dataframe を読み込むときに提供する スキーマ は、最後の出力の実際のスキーマと比較されます。行の型、行の nullability、または行の順序が一致しない場合、例外が発生します。行の順序が同じままであることを確認するために、次の構造を使用してください:
Copied!1 2 3 4 5
# schemaはpyspark.sql.types.StructTypeです previous = out.dataframe('previous', schema) # 以前のデータフレームを取得 # データフレームを選択して書き込みます out.write_dataframe(df.select(schema.fieldNames()))
Foundry は、ユーザーの変換で使用されるスキーマに関係なく、すべての行を nullable として保存します。その結果、一部のフィールドが non-nullable に設定されたスキーマを提供すると、 previous
モードで出力から読み取る際に SchemaMismatchError
が発生し、ビルドが失敗します。
詳しくは、Merge and replace の例をご覧ください。
Output Read Mode | Output Write Mode | 新しいデータがすでに書き込まれていますか? |
---|---|---|
current または added | modify / replace | はい |
added
を使用することをおすすめします。これによりユーザーの意図が明確になります。
現在の変換によって書き込まれたデータを読み取ることが有益なシナリオは、データに対してチェックを実行し、チェックがパスしない場合はビルドを失敗させることです。これにより、データを再計算する必要も、チェックを実行するために Spark にキャッシュする必要もありません。
TransformContext
オブジェクトと比較して、 IncrementalTransformContext
オブジェクトは追加のプロパティ: is_incremental
を提供します。このプロパティは、変換が増分的に実行される場合に True
に設定され、次の意味を持ちます:
modify
に設定されている、およびadded
に設定されています。増分デコレーターを使用すると、読み取りモード “previous” を指定することで変換の前回の入力と出力にアクセスできます。これにより、現在のビルドを歴史的なコンテキストに基づいて行うことができます。変換がスナップショットモードで実行されると、“previous” データフレームは空になります。これは、これが最初の実行であるか、ロジックやデータが大幅に変更されて再計算が必要になったためです。
しかし、最も一般的なケースは、入力に “added” モードを使用し、出力に “modify” モードを使用することです。これらのモードはデフォルトで使用されます。これにより、新しく追加された行を入力データセットから取得し、それらを処理し、出力データセットに追加することができます。
出力に行を追加する代わりに、出力データセットにすでに存在する一部の既存の行を変更することがあります。そのためには、一般的なシナリオの例で示されているように、“replace” モードを使用します。
茶髪の学生のみを含むように学生をフィルター処理する増分変換を分析してみましょう:
Copied!1 2 3 4 5 6 7 8
@incremental() #インクリメンタル:変更されたデータのみを処理するデコレータ @transform( students=Input('/examples/students_hair_eye_color'), #students:入力データのパスを定義 processed=Output('/examples/hair_eye_color_processed') #processed:出力データのパスを定義 ) def filter_hair_color(students, processed): #filter_hair_color:髪の色がブラウンの学生をフィルターする関数 students_df = students.dataframe() #students_df:入力データをデータフレーム形式で読み込む processed.write_dataframe(students_df.filter(students_df.hair == 'Brown')) #processed.write_dataframe:髪の色がブラウンの学生のデータフレームを出力
例えば、/examples/students_hair_eye_color
入力データセットが新しい生徒のセットで完全に置き換えられたとします。ここで、新しい生徒のセットを前の出力結果に追加すると、出力データセットが誤っていることがわかります。これは、incremental()
デコレータがトランスフォームをインクリメンタルに実行しないことを決定する状況の例です。
トランスフォームがインクリメンタルに実行されるためには、以下の要件が満たされている必要があります。
トランスフォームに incremental()
デコレータがあるが、上記のいずれかの要件が満たされていない場合、トランスフォームは自動的に非インクリメンタルに実行されます。
これは、デフォルトの出力書き込みモードが modify
から replace
に設定され、入力が非インクリメンタルに提示されることを意味します。また、トランスフォームで出力を読み取ると、空のデータフレームが返されるため、前の履歴にアクセスできません。同様に、入力も非インクリメンタルに提示されます。require_incremental=True
を設定すると、トランスフォームは非インクリメンタルに実行されるのではなく、失敗します。
特定の入力を完全に書き換えても、トランスフォームをインクリメンタルに実行できるようにすることがよく望まれます。詳細については、スナップショット入力を参照してください。
require_incremental=True
引数を incremental
デコレータに渡すことで、トランスフォームをインクリメンタルに実行することができます(ただし、これまでに実行されたことがない場合やセマンティックバージョンが上がった場合を除く)。トランスフォームがインクリメンタルに実行できない場合は、非インクリメンタルに実行しようとせずに意図的に失敗します。
トランスフォームは、インクリメンタル入力がすべて、最後の実行以降にファイルが追加された状態(APPEND
または UPDATE
トランザクションを使用)である場合、インクリメンタルに実行できます。
逆に、インクリメンタル入力のいずれかが以下の状態の場合、トランスフォームはインクリメンタルに実行できません。
SNAPSHOT
トランザクションがあった場合UPDATE
または DELETE
トランザクションがあった場合。例えば、students_hair_eye_color
の生徒のリストが完全に変更された場合、フィルター処理された生徒の前の出力は無効であり、置き換える必要があります。
上流データセットが無限に増え続ける場合、古い行を削除することができます(Foundry Retention を使用)が、下流の計算のインクリメンタリティに影響を与えないように、そのデータセットに依存するインクリメンタルトランスフォームは、明示的に retained 入力を許可するように設定する必要があります。これは、transforms.api.incremental
デコレータの allow_retention
引数を使用して行うことができます。
True
に設定されている場合、Foundry Retention からの削除がインクリメンタリティの維持を評価する際に無視されます。つまり、Retention からの removed
入力はインクリメンタリティを損なわず、唯一の non-added
入力が retained 行の入力である場合でも、トランスフォームはインクリメンタルに実行されます。False
(デフォルト)の場合、入力データセットの任意の removed
タイプの変更によって、トランスフォームはスナップショットを実行します。Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# インクリメンタルデコレータ(データの保持を許可) @incremental(allow_retention=True) # トランスフォームデコレータ(入力・出力データの指定) @transform( # 入力データ: students students=Input('/examples/students_hair_eye_color'), # 出力データ: processed processed=Output('/examples/hair_eye_color_processed') ) # 眼の色をフィルタリングする関数 def filter_hair_color(students, processed): # studentsをデータフレームに変換 students_df = students.dataframe() # 眼の色が茶色のデータをフィルタリングしてprocessedに書き込む processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
上記の例では、データセット /examples/students_hair_eye_color
の変更セット後に変換が実行され、その変更セットには Foundry Retention を使用した 追加された
変更と 削除された
変更のみが含まれている場合、変換は増分的に実行されます。他の方法で行われた 削除された
変更や 修正された
変更が存在する場合、スナップショットがトリガーされます。
allow_retention=True
を指定しても、Foundry Retention から来る 削除された
変更の増分性への影響だけが防止されます。入力データセットの他の削除は依然として変換が増分計算の代わりにスナップショットを実行する原因となります。
入力が全面的に書き換えられることが許されるシナリオがあり、その場合でも変換の増分性は無効になりません。例えば、電話番号の国コードを国にマッピングする簡単な参照データセットを持っていて、これが定期的に書き換えられるとします。このデータセットへの変更は必ずしも以前の計算の結果を無効にするわけではないため、変換が増分的に実行されるのを防ぐべきではありません。
デフォルトでは、上記で説明したように、変換は最後に実行された後に任意の入力が全面的に書き換えられた場合、増分的に実行することはできません。スナップショット入力はこのチェックから除外され、実行間での開始トランザクションが異なってもよいとされます。
スナップショット入力は、incremental()
デコレーターの snapshot_inputs
引数を使用して設定することができます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # 前回の実行以降に見つかったすべての新しい電話番号です phone_numbers = phone_numbers.dataframe() # 以前に見たものに関係なく、すべての国コードです country_codes = country_codes.dataframe() # 電話番号の国コードが国コードのコードと一致する条件です cond = [phone_numbers.country_code == country_codes.code] # 左外部結合を使用して、電話番号と国コードを結合し、結果を出力します output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))
スナップショット入力の動作は、変換が増分的に実行される場合と非増分的に実行される場合とで同一です。そのため、added
および current
読み取りモードは常にデータセット全体を返します。他のすべての読み取りモードは空のデータセットを返します。
スナップショット入力の以前に見たバージョンに関して制約がないため、スナップショット入力を追加または削除しながら、変換を増分的に実行する能力を保持することが可能です。入力の変更が変換のセマンティクスを根本的に変更する場合、incremental()
デコレーターの semantic_version
引数を更新するかどうかを再確認する価値があります。
既存の入力リストは変更可能です。次の場合に増分性が保持されます:
また、非スナップショット入力データセットの各開始トランザクションが、前回の実行で使用されたものと一致する必要があります。
複数出力の増分的な変換の場合、それぞれの出力に対する最後にコミットされたトランザクションは、同じ変換から生成されていなければなりません。
変換は、すべての増分入力がファイルの追加のみであった場合、またはファイルが削除された場合には、それらのファイルが Foundry Retention で allow_retention=True
を使用して削除された場合にのみ、増分的に実行できます。スナップショット入力はこのチェックから除外されます。