ドキュメントの検索
karat

+

K

APIリファレンス ↗

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

インクリメンタル変換リファレンス

インクリメンタルデコレーター

Tip

このガイドの残りの部分では、インクリメンタルビルドと非インクリメンタルビルドについて参照します。すべての場合で、 incremental() デコレーターが使用されていると仮定されます。したがって、この用語は、変換が実際にインクリメンタルに実行されるかどうかを指します。

incremental() デコレーターは、インクリメンタル計算を有効にするロジックで変換の計算機能をラップするために使用できます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, incremental, Input, Output @incremental() @transform( students=Input('/examples/students_hair_eye_color'), # studentsというInputパスを設定します processed=Output('/examples/hair_eye_color_processed') # processedというOutputパスを設定します ) def filter_hair_color(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None students_df = students.dataframe('added') # 追加されたデータをデータフレームとして取得します processed.write_dataframe(students_df.filter(students_df.hair == 'Brown')) # 髪の色がブラウンの学生のみをフィルタリングして保存します

incremental()デコレータは、transform(), transform_df(), transform_pandas()デコレータを使用する既存の変換に対して使用できます。ただし、ユーザーの変換のための計算関数は、インクリメンタルかつ非インクリメンタルで実行できるようにサポートされている必要があります。「incremental()」デコレータは、2つの主要なことを行います。

  • 変換が前回のビルドに関する情報を参照できるようにします。この情報を使用して、「incremental()」デコレータは、変換が以下で説明されている要件に従ってインクリメンタルに実行できるかどうかを判断します。
  • 入力、出力、およびコンテキストオブジェクトを、追加機能を提供するインクリメンタルなサブクラスに変換します。具体的には、TransformInputIncrementalTransformInputになり、TransformOutputIncrementalTransformOutputになり、TransformContextIncrementalTransformContextになります。これらのインクリメンタルオブジェクトは、デコレータでラップされた変換に渡されます。

インクリメンタルデコレータは5つの引数を取ります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 # transforms.api.incrementalは、データ変換の実行を制御するための関数です。 transforms.api.incremental( # require_incrementalは、インクリメンタルな実行を必要とするかどうかを指定します。Falseに設定すると、常にフルロードが強制されます。 require_incremental=False, # semantic_versionは、変換のバージョンを指定します。ここでは1を指定しています。 semantic_version=1, # snapshot_inputsは、スナップショット入力を指定します。Noneを指定すると、すべての入力がスナップショットになります。 snapshot_inputs=None, # allow_retentionは、保存ポリシーを許可するかどうかを指定します。Falseに設定すると、保存ポリシーは無視されます。 allow_retention=False, # strict_appendは、厳密な追加を行うかどうかを指定します。Falseに設定すると、レコードは変更されずに追加されます。 strict_append=False)

require_incremental 引数を True に設定すると、インクリメンタル実行ができない場合、変換が失敗します。require_incremental=True の場合でも、スナップショットとして実行が許可されるケースが 2 つあります。

  1. 出力がまだ一度も生成されていない場合。
  2. セマンティックバージョンが変更され、スナップショットが明示的に要求された場合。

インクリメンタル実行ができない原因を調べるには、ドライバーログの警告transforms.api._incremental: Not running incrementallyを確認してください。

:func:~transforms.api.incremental デコレーターの semantic_version 引数は、次の変換の実行を非インクリメンタルにすることができます。

  • 現在の実行のセマンティックバージョンが前回の実行のセマンティックバージョンと異なる場合、変換は非インクリメンタルで実行されます。
  • 指定されていない場合、セマンティックバージョンは 1 に設定されます。
  • 前回の実行のセマンティックバージョンが存在しない場合(既存の変換をインクリメンタル変換に変換する場合など)、値 1 が想定されます。これにより、新しいスナップショットを必要とせずに、変換をインクリメンタルで実行を開始できます。
  • 変換の次回の実行を非インクリメンタルにするには、@incremental() デコレーターの semantic_version 引数を上げます。
    • セマンティックバージョンを上げる際には、整数のみを使用してください。

snapshot_inputs 引数は、スナップショット入力としていくつかの入力を定義することができ、これらの入力は、非スナップショット入力とは異なり、更新や削除の変更をサポートします。詳細については、スナップショット入力 を参照してください。

allow_retention 引数を true に設定すると、Foundry Retention による入力および出力データセット内のファイルの削除が許可され、変換のインクリメンタル性が維持されます。

strict_append パラメーターが true に設定されている場合、基礎となる Foundry トランザクション のタイプは APPEND に設定され、インクリメンタルな書き込みに APPEND トランザクションが使用されます。書き込み操作では、Parquet のサマリーメタデータや Hadoop の SUCCESS ファイルなどの補助ファイルも含め、どのファイルも上書きされないことに注意してください。すべての Foundry 形式は、このモードをサポートするはずです。

重要な情報

上記で説明したように、incremental() デコレーターでラップされた変換の計算関数は、インクリメンタル実行と非インクリメンタル実行の両方をサポートする必要があります。デフォルトの読み込みモードと書き込みモード(このページの後半で詳しく説明されています)は、この二重ロジック要件をサポートするのに役立ちますが、計算コンテキストis_incremental プロパティに基づいて分岐する必要があるかもしれません。

もうひとつの重要な点は、incremental() デコレーターを transform_df() または transform_pandas() と一緒に使用すると、デフォルトの読み書きモードのみが利用可能になることです。これは、追加された入力行のみに関数の追加出力行がある変換(追加の例 を参照)に対しては十分です。ただし、入力読み込みモードや出力書き込みモードを設定する必要があるより複雑なロジック(結合、集計、重複排除など)を実行する変換がある場合は、incremental() デコレーターを transform() と一緒に使用する必要があります。インクリメンタルデコレーターを transform() と一緒に使用することで、読み書きモードを設定できます。

警告

コードリポジトリのプレビュー機能では、すべての変換が非インクリメンタルモードで実行されることに注意してください。これは、incremental() デコレーターに require_incremental=True が渡された場合でも同様です。

入力と出力のインクリメンタルモード

IncrementalTransformInput

transforms.api.IncrementalTransformInput オブジェクトは、オプションの読み込みモードで dataframe() メソッドを拡張します。

オプションの入力読み込みモードパラメーターは、transform()デコレーターを使用している場合にのみ利用できます。transform_df() および transform_pandas() デコレーターは、引数なしで入力の dataframe() および pandas() を呼び出し、PySpark および Pandas DataFrame オブジェクトを抽出します。これは、読み込みモードが常にデフォルトの added モードであることを意味します。

インクリメンタルデコレーターを使用して変換を定義する場合、変換がインクリメンタル実行されるか非インクリメンタル実行されるかによって、読み込みモードの動作が異なります。

読み込みモードインクリメンタル動作非インクリメンタル動作
added *変換が前回実行されてから追加された入力の新しい行を含む DataFrame を返します。すべての行が未見と見なされるため、データセット全体を含む DataFrame を返します。
previous変換が前回実行されたときに与えられた入力全体を含む DataFrame を返します。空の DataFrame を返します。
current現在の実行の入力データセット全体を含む DataFrame を返します。現在の実行の入力データセット全体を含む DataFrame を返します。これは added と同じになります。

デフォルトの読み込みモードは added です。

インクリメンタル方式で扱われることが望ましくない入力がある場合がありますが、変換が incremental() としてマークされている場合があります。このような入力の詳細や、読み込みモードの動作がこれらのタイプの入力にどのように異なるかについては、スナップショット入力 を参照してください。

出力のデフォルトの読み込みモードは current であり、利用可能な出力の読み込みモードは addedcurrent、および previous です。出力の読み込みモードの詳細については、以下のセクションを参照してください。

インクリメンタル変換の性質上、前回の SNAPSHOT トランザクションから入力データセットの過去のすべてのトランザクションを読み込んで入力ビューを構築します。インクリメンタル変換で段階的な遅延が見られるようになった場合は、インクリメンタル入力データセットで SNAPSHOT ビルドを実行することをお勧めします。

IncrementalTransformOutput

transforms.api.IncrementalTransformOutput オブジェクトは、出力データセットの読み込みモードと書き込みモードにアクセスできます。出力データセットのデフォルトの書き込みモードである modify を使用して、インクリメンタルビルドと非インクリメンタルビルドの両方と互換性のあるロジックを書くことが重要です。書き込みモードは 2 つあります。

  • modify:このモードでは、ビルド中に書き込まれたデータで既存の出力を変更します。たとえば、出力が modify モードにある場合、write_dataframe() を呼び出すと、書き込まれた DataFrame が既存の出力に追加されます。
  • replace:このモードでは、ビルド中に書き込まれたデータで出力を完全に置き換えます。

変換がインクリメンタルに実行されるということは、出力のデフォルトの書き込みモードが modify に設定されていることを意味します。同様に、変換が非インクリメンタルに実行されるということは、出力のデフォルトの書き込みモードが replace に設定されていることを意味します。

入力 DataFrame のデフォルトの読み込みモードが added であることを思い出してください。added のデフォルトの入力読み込みモードと出力のデフォルトの書き込みモードである modify のおかげで、インクリメンタルビルドと非インクリメンタルビルドの両方と互換性のあるロジックを書くことがはるかに簡単になります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # type: (IncrementalTransformInput, IncrementalTransformOutput) -> None # まだ見ていない行のみを読みます。 new_students_df = students.dataframe() # これは students.dataframe('added') と同等です # 非インクリメンタルの場合、すべての行を読み出力を置き換えます。 # インクリメンタルの場合、新しい行のみを読み、それらを出力に追加します。 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') # ヘアカラーがブラウンの生徒だけをフィルタリングします )

増分計算のより複雑な使用例では、正しい書き込みモードを計算し、手動で設定する必要があるかもしれません。これは、増分出力で set_mode() メソッドを使用して行うことができます。

出力書き込みモードは、transform() デコレーターを使用している場合に限り、手動で設定することができます。このデコレーターを使用すると、出力を保存する前に set_mode() を使用できます。一方、transform_df() および transform_pandas() デコレーターは、write_dataframe() および write_pandas() を呼び出して DataFrame 出力を保存します。これは、使用される書き込みモードが incremental() デコレーターによって決定されることを意味します。

警告

set_mode() を使用する場合、変換が増分または非増分で実行される場合にこれが有効な動作であることを確認することが重要です。そうでない場合は、is_incremental プロパティを使用する必要があります。

書き込みモードに加えて、transforms.api.IncrementalTransformOutput では、出力データセットから DataFrame を読み込むことができます。これは、オプションの読み込みモードを引数に取る dataframe() メソッドを使用して行うことができます。デフォルトの読み込みモードは current に設定されており、他の利用可能な出力読み込みモードは added および previous です。読み込みモードは、データセットの書き込みモードが何に設定されているかによって異なる動作をします。

ヒント

デフォルトの読み込みモードは current ですが、ほとんどの場合、実際には previous を使用したいです。他の読み込みモードは、それに書き込んだ後のデータセットを読み込むために使用されるべきです。

前回の実行からデータを読み込む、有効な組み合わせ:

前回の出力からデータを読み込むには、変換を増分モードで実行する必要があります(ctx.is_incremental is True)。そうでない場合、dataframe は空になります。

出力読み込みモード出力書き込みモード新しいデータがすでに書き込まれていますか?動作
currentmodifyいいえdataframe() は変換の前回の出力を返します。
currentmodifyはいdataframe() は変換の前回の出力と現在実行中のビルドで出力に書き込まれたデータを返します。
currentreplaceいいえこれらの設定は無効であり、予期しない動作を引き起こす可能性があります。異なるスキーマを持つ新しい入力で前回の出力をマージして置き換えたい場合は、スキーマ変更とマージおよび置換の例を参照してください。
currentreplaceはいdataframe() は現在実行中のビルドで出力に書き込まれたデータを返します。
addedmodify/replaceいいえこれらの設定の使用例はありません。代わりに previous モードを使用してください。
addedmodify/replaceはいdataframe() は現在実行中のビルドで出力に書き込まれたデータを返します。
previousmodifyはい/いいえdataframe() は変換の前回の出力を返します。スキーマは previous で読み込む際の必須フィールドです。
previousreplaceはい/いいえdataframe() は変換の前回の出力を返します。スキーマは previous で読み込む際の必須フィールドです。

前回の dataframe を取得するために current を使用する場合、schema を提供する必要はありません。これは、current がすでに構築されているはずの出力のスキーマを使用するからです。 しかし、current モードは previous よりも脆弱です。current モードは次の場合に失敗します:

  • 変換が非増分で実行され、出力で dataframe を呼び出す前に write_modemodify にオーバーライドしなかった場合
  • 変換がまだ計算されていないため、スキーマが不明な空の DataFrame を構築できない場合。

前回の dataframe を読み込むときに提供する スキーマ は、最後の出力の実際のスキーマと比較されます。行の型、行の nullability、または行の順序が一致しない場合、例外が発生します。行の順序が同じままであることを確認するために、次の構造を使用してください:

Copied!
1 2 3 4 5 # schemaはpyspark.sql.types.StructTypeです previous = out.dataframe('previous', schema) # 以前のデータフレームを取得 # データフレームを選択して書き込みます out.write_dataframe(df.select(schema.fieldNames()))

Foundry は、ユーザーの変換で使用されるスキーマに関係なく、すべての行を nullable として保存します。その結果、一部のフィールドが non-nullable に設定されたスキーマを提供すると、 previous モードで出力から読み取る際に SchemaMismatchError が発生し、ビルドが失敗します。

詳しくは、Merge and replace の例をご覧ください。

現在のランで書き込まれたデータを読み取る、有効な組み合わせ:

Output Read ModeOutput Write Mode新しいデータがすでに書き込まれていますか?
current または addedmodify / replaceはい

added を使用することをおすすめします。これによりユーザーの意図が明確になります。 現在の変換によって書き込まれたデータを読み取ることが有益なシナリオは、データに対してチェックを実行し、チェックがパスしない場合はビルドを失敗させることです。これにより、データを再計算する必要も、チェックを実行するために Spark にキャッシュする必要もありません。

IncrementalTransformContext

TransformContext オブジェクトと比較して、 IncrementalTransformContext オブジェクトは追加のプロパティ: is_incremental を提供します。このプロパティは、変換が増分的に実行される場合に True に設定され、次の意味を持ちます:

  • デフォルトの出力書き込みモードが modify に設定されている、および
  • 入力のデフォルトの読み取りモードが added に設定されています。

増分モードの概要

増分デコレーターを使用すると、読み取りモード “previous” を指定することで変換の前回の入力と出力にアクセスできます。これにより、現在のビルドを歴史的なコンテキストに基づいて行うことができます。変換がスナップショットモードで実行されると、“previous” データフレームは空になります。これは、これが最初の実行であるか、ロジックやデータが大幅に変更されて再計算が必要になったためです。

しかし、最も一般的なケースは、入力に “added” モードを使用し、出力に “modify” モードを使用することです。これらのモードはデフォルトで使用されます。これにより、新しく追加された行を入力データセットから取得し、それらを処理し、出力データセットに追加することができます。

出力に行を追加する代わりに、出力データセットにすでに存在する一部の既存の行を変更することがあります。そのためには、一般的なシナリオの例で示されているように、“replace” モードを使用します。

増分計算の要件

茶髪の学生のみを含むように学生をフィルター処理する増分変換を分析してみましょう:

Copied!
1 2 3 4 5 6 7 8 @incremental() #インクリメンタル:変更されたデータのみを処理するデコレータ @transform( students=Input('/examples/students_hair_eye_color'), #students:入力データのパスを定義 processed=Output('/examples/hair_eye_color_processed') #processed:出力データのパスを定義 ) def filter_hair_color(students, processed): #filter_hair_color:髪の色がブラウンの学生をフィルターする関数 students_df = students.dataframe() #students_df:入力データをデータフレーム形式で読み込む processed.write_dataframe(students_df.filter(students_df.hair == 'Brown')) #processed.write_dataframe:髪の色がブラウンの学生のデータフレームを出力

例えば、/examples/students_hair_eye_color 入力データセットが新しい生徒のセットで完全に置き換えられたとします。ここで、新しい生徒のセットを前の出力結果に追加すると、出力データセットが誤っていることがわかります。これは、incremental()デコレータがトランスフォームをインクリメンタルに実行しないことを決定する状況の例です。 トランスフォームがインクリメンタルに実行されるためには、以下の要件が満たされている必要があります。

トランスフォームに incremental() デコレータがあるが、上記のいずれかの要件が満たされていない場合、トランスフォームは自動的に非インクリメンタルに実行されます。 これは、デフォルトの出力書き込みモードが modify から replace に設定され、入力が非インクリメンタルに提示されることを意味します。また、トランスフォームで出力を読み取ると、空のデータフレームが返されるため、前の履歴にアクセスできません。同様に、入力も非インクリメンタルに提示されます。require_incremental=True を設定すると、トランスフォームは非インクリメンタルに実行されるのではなく、失敗します。

特定の入力を完全に書き換えても、トランスフォームをインクリメンタルに実行できるようにすることがよく望まれます。詳細については、スナップショット入力を参照してください。

Tip

require_incremental=True 引数を incremental デコレータに渡すことで、トランスフォームをインクリメンタルに実行することができます(ただし、これまでに実行されたことがない場合やセマンティックバージョンが上がった場合を除く)。トランスフォームがインクリメンタルに実行できない場合は、非インクリメンタルに実行しようとせずに意図的に失敗します。

追加のみの入力変更

トランスフォームは、インクリメンタル入力がすべて、最後の実行以降にファイルが追加された状態(APPEND または UPDATE トランザクションを使用)である場合、インクリメンタルに実行できます。

逆に、インクリメンタル入力のいずれかが以下の状態の場合、トランスフォームはインクリメンタルに実行できません。

  • 完全に書き換えられた状態であること、例えば、SNAPSHOT トランザクションがあった場合
  • 更新されたファイルや削除されたファイルがある状態であること、例えば、UPDATE または DELETE トランザクションがあった場合。

例えば、students_hair_eye_color の生徒のリストが完全に変更された場合、フィルター処理された生徒の前の出力は無効であり、置き換える必要があります。

Foundry Retention からの削除がある入力

上流データセットが無限に増え続ける場合、古い行を削除することができます(Foundry Retention を使用)が、下流の計算のインクリメンタリティに影響を与えないように、そのデータセットに依存するインクリメンタルトランスフォームは、明示的に retained 入力を許可するように設定する必要があります。これは、transforms.api.incremental デコレータの allow_retention 引数を使用して行うことができます。

  • このフィールドが True に設定されている場合、Foundry Retention からの削除がインクリメンタリティの維持を評価する際に無視されます。つまり、Retention からの removed 入力はインクリメンタリティを損なわず、唯一の non-added 入力が retained 行の入力である場合でも、トランスフォームはインクリメンタルに実行されます。
  • フィールドが False(デフォルト)の場合、入力データセットの任意の removed タイプの変更によって、トランスフォームはスナップショットを実行します。
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # インクリメンタルデコレータ(データの保持を許可) @incremental(allow_retention=True) # トランスフォームデコレータ(入力・出力データの指定) @transform( # 入力データ: students students=Input('/examples/students_hair_eye_color'), # 出力データ: processed processed=Output('/examples/hair_eye_color_processed') ) # 眼の色をフィルタリングする関数 def filter_hair_color(students, processed): # studentsをデータフレームに変換 students_df = students.dataframe() # 眼の色が茶色のデータをフィルタリングしてprocessedに書き込む processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

上記の例では、データセット /examples/students_hair_eye_color の変更セット後に変換が実行され、その変更セットには Foundry Retention を使用した 追加された 変更と 削除された 変更のみが含まれている場合、変換は増分的に実行されます。他の方法で行われた 削除された 変更や 修正された 変更が存在する場合、スナップショットがトリガーされます。

警告

allow_retention=True を指定しても、Foundry Retention から来る 削除された 変更の増分性への影響だけが防止されます。入力データセットの他の削除は依然として変換が増分計算の代わりにスナップショットを実行する原因となります。

スナップショット入力

入力が全面的に書き換えられることが許されるシナリオがあり、その場合でも変換の増分性は無効になりません。例えば、電話番号の国コードを国にマッピングする簡単な参照データセットを持っていて、これが定期的に書き換えられるとします。このデータセットへの変更は必ずしも以前の計算の結果を無効にするわけではないため、変換が増分的に実行されるのを防ぐべきではありません。

デフォルトでは、上記で説明したように、変換は最後に実行された後に任意の入力が全面的に書き換えられた場合、増分的に実行することはできません。スナップショット入力はこのチェックから除外され、実行間での開始トランザクションが異なってもよいとされます。

スナップショット入力は、incremental() デコレーターの snapshot_inputs 引数を使用して設定することができます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @incremental(snapshot_inputs=['country_codes']) @transform( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # type: (TransformInput, TransformInput, TransformOutput) -> None # 前回の実行以降に見つかったすべての新しい電話番号です phone_numbers = phone_numbers.dataframe() # 以前に見たものに関係なく、すべての国コードです country_codes = country_codes.dataframe() # 電話番号の国コードが国コードのコードと一致する条件です cond = [phone_numbers.country_code == country_codes.code] # 左外部結合を使用して、電話番号と国コードを結合し、結果を出力します output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer'))

スナップショット入力の動作は、変換が増分的に実行される場合と非増分的に実行される場合とで同一です。そのため、added および current 読み取りモードは常にデータセット全体を返します。他のすべての読み取りモードは空のデータセットを返します。

スナップショット入力の以前に見たバージョンに関して制約がないため、スナップショット入力を追加または削除しながら、変換を増分的に実行する能力を保持することが可能です。入力の変更が変換のセマンティクスを根本的に変更する場合、incremental() デコレーターの semantic_version 引数を更新するかどうかを再確認する価値があります。

入力への変更

既存の入力リストは変更可能です。次の場合に増分性が保持されます:

  • 新しい入力または新しいスナップショット入力が追加された場合、または
  • 既存の入力または既存のスナップショット入力が削除された場合。 増分的な変換には少なくとも1つの入力が必要であることに注意してください。

また、非スナップショット入力データセットの各開始トランザクションが、前回の実行で使用されたものと一致する必要があります。

同じ変換によって最後に構築された出力

複数出力の増分的な変換の場合、それぞれの出力に対する最後にコミットされたトランザクションは、同じ変換から生成されていなければなりません。

増分計算の要件の要約

変換は、すべての増分入力がファイルの追加のみであった場合、またはファイルが削除された場合には、それらのファイルが Foundry Retention で allow_retention=True を使用して削除された場合にのみ、増分的に実行できます。スナップショット入力はこのチェックから除外されます。