ドキュメントの検索
karat

+

K

APIリファレンス ↗
データ統合Pythonインクリメンタル変換
Feedback

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

このセクションでは、インクリメンタルに計算可能なトランスフォームの幅広い例を提供します:

例では、インクリメンタルな計算を示すために2つの入力、studentsstudents_updatedを使用します。students入力には3人の学生が含まれており、これはインクリメンタルではありません。つまり、履歴がありません:

>>> students.dataframe('previous').sort('id').show()
+---+----+---+---+
| id|hair|eye|sex|
+---+----+---+---+
+---+----+---+---+
>>>
>>> students.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # 入力のデフォルトの読み取りモードは 'added' であることを思い出してください
>>> students.dataframe('added') is students.dataframe()
True

students_updated 入力は、students と同じですが、追加の更新が含まれており、3 人の追加学生が含まれています。この更新により、入力がインクリメンタルになります。したがって、空でない previous DataFrame を持っています。

>>> # 'previous'という名前のデータフレームを取得し、'id'でソートして表示します。
>>> students_updated.dataframe('previous').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # 'current'という名前のデータフレームを取得し、'id'でソートして表示します。
>>> students_updated.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  1|Brown|Green|Female|
|  2|  Red| Blue|  Male|
|  3|Blond|Hazel|Female|
|  4|Brown|Green|Female|
|  5|Brown| Blue|  Male|
|  6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # 'added'という名前のデータフレームを取得し、'id'でソートして表示します。
>>> students_updated.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair|  eye|   sex|
+---+-----+-----+------+
|  4|Brown|Green|Female|
|  5|Brown| Blue|  Male|
|  6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # 入力のデフォルトの読み取りモードが 'added' であることを思い出してください
>>> # 'added'という名前のデータフレームは、名前を指定せずにデータフレームを取得するのと同じです。
>>> students_updated.dataframe('added') is students_updated.dataframe()
True

追加

追加のみのインクリメンタル計算とは、追加された出力行が追加された入力行のみの関数である計算のことです。これは、変換が以下の操作を行うことを意味します。

  • 新しく追加された入力データを見る
  • これらの追加された入力行のみの関数である新しい出力行を計算し、
  • 既存の出力に新しい出力を追加する

列タイプの変更、日付の文字列としての書式設定、およびフィルタリングは、すべて追加のみの計算の例です。これらの例では、追加された入力行が変換されたり削除されたりして、出力行が生成されます。

追加のみの変換をインクリメンタルにする唯一の違いは、incremental() デコレーションです。

インクリメンタルに実行すると、デフォルトの読み取りモードである added は、変換が新しい学生のみを読み取ることを意味し、デフォルトの書き込みモードである modify は、変換がフィルター処理された新しい学生のみを出力に追加することを意味します。

インクリメンタルでない場合、デフォルトの読み取りモードである added は、変換が完全な入力を読み取ることを意味し、デフォルトの書き込みモードである replace は、変換がフィルター処理された学生の完全なセットで出力を置き換えることを意味します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from transforms.api import transform, incremental, Input, Output # インクリメンタル変換 @incremental() # 入力と出力の指定 @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) # 関数定義 def incremental_filter(students, processed): # 新しい学生データフレームを作成 new_students_df = students.dataframe() # 処理済みデータに新しいデータフレームを書き込む (髪の色が茶色のものだけフィルタリング) processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )

マージと追加

トランスフォームが前回の出力結果を参照して、インクリメンタルに更新を計算する必要がある場合があります。例として、distinct() メソッドがあります。 トランスフォームで重複した行を削除するには(現在の出力が正しいと仮定して)、入力内の新しい行の重複を削除し、それらの行が出力に存在しないことを確認する必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 @incremental() # インクリメンタル。このデコレータは、処理が途中で中断された場合でも、再開時に途中から処理を再開できるようにする機能を提供します。 @transform( # このデコレータは、入力データを加工するための関数を定義します。 students=Input('/examples/students_hair_eye_color'), # 入力データ。ここでは学生の髪の色と瞳の色のデータを指定しています。 processed=Output('/examples/hair_eye_color_processed') # 出力データ。ここでは加工後のデータを指定しています。 ) def incremental_distinct(students, processed): # incremental_distinctという名前の関数を定義します。この関数は、入力データから重複を除いたデータを作成し、それを出力データとして書き出します。 new_students_df = students.dataframe() # 入力データをデータフレームとして読み込みます。 processed.write_dataframe( # 処理後のデータフレームを書き出します。 new_students_df.distinct().subtract( # 入力データから重複を除いたデータを作成し、 processed.dataframe('previous', schema=new_students_df.schema) # 前回の処理結果と比較して新たに追加されたデータのみを取り出します。 ) )

ここでは、出力データセットの previous 読み取りモードを使用します。これにより、最後のビルド時に出力された DataFrame が返されます。 previous 出力が存在しない可能性があるため、空の DataFrame を正しく構築するために dataframe('previous') 呼び出しにスキーマを提供する必要があります。

このコードを実行すると、出力データセットのスキーマはデータから自動的に推測されます。これには、列の名前、型、"nullability"(参照 StructField)、および列の順序の自動検出が含まれます。ビルドの信頼性を確保するために、Spark の推測に依存するのではなく、dataframe の予想されるスキーマをハードコードすることがベストプラクティスです。

マージと置換

全体の出力を常に置換する一部の変換があります。しかし、これらの変換は、増分計算から依然として利益を得ることができます。その一例が統計の集計です。例えば、行における各異なる値が何回発生するかをカウントするなどです。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from pyspark.sql import functions as F @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_group_by(students, processed): # 新しい学生のみの髪の色のカウントを計算します。 new_hair_counts_df = students.dataframe().groupBy('hair').count() # 古いカウントと統合します out_schema = new_hair_counts_df.schema all_counts_df = new_hair_counts_df.union( processed.dataframe('previous', schema=out_schema) ) # 髪の色でグループ化し、2つのカウントの合計を計算します。 totals_df = all_counts_df.groupBy('hair').agg(F.sum('count').alias('count')) # 出力を完全に置き換えるため、出力モードは常に'replace'に設定します。 # 出力モードを変更する前に、トータルデータフレームのチェックポイントを作成します。 totals_df.localCheckpoint(eager=True) processed.set_mode('replace') processed.write_dataframe(totals_df.select(out_schema.fieldNames()))

再度説明しますが、previous 出力が存在しない場合があるため、空の DataFrame を正しく構築できるように dataframe('previous') コールにスキーマを提供する必要があります。

スキーマ変更でマージして置き換える

場合によっては、スキーマが変更される入力でデータセットをインクリメンタルに更新する必要があります。これは、例えば、入力が時間経過で列が追加されたり削除されたりするソーステーブルから来る場合です。この場合、古いスキーマで前の出力を読み込んで、新しいスキーマと手動で調整する必要があります。

また、この変換が SNAPSHOT モードで実行される場合、つまり前の出力がない場合の挙動を定義する必要があります。この場合、processed.dataframe('current') を呼び出すと失敗します。

以下の例は、新しい students データセットのスキーマが異なる場合でも、既存の students_processed データセットに新しい students を追加する方法を示しています。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from pyspark.sql import functions as F @incremental( snapshot_inputs=['students'] ) @transform( students=Input('/examples/students_raw'), # このデータセットのスキーマは変更される可能性があります。これはSNAPSHOTトランザクションでなければならないことを意味しますので、"snapshot_inputs"パラメータにそれを含めます processed=Output('/examples/students_processed') ) def incremental_group_by(students, processed, ctx): if not ctx.is_incremental: # このケースは独立して処理する必要があります # ... return # 旧プロセスデータフレームとその関連スキーマを読み込む # まだプロセスに書き込んでいないため、'current'は前のトランザクションを提供します # 注:ここで重要なのは、この時点での'processed'の書き込みモードがまだ'modify'であることです(以下の警告を参照) students_previous = processed.dataframe('current').localCheckpoint() # 旧データフレームと新データフレームをマージし、欠けている列をnullに設定する students_all = students_previous.unionByName(students.dataframe(), allowMissingColumns=True) # 新しい結合データフレームを書き出す processed.set_mode('replace') processed.write_dataframe(students_all)
警告

.dataframe()呼び出しによる読み取りモードの評価は遅延(必要に応じて)されることに注意してください。つまり、その値が必要となるまで評価が遅延されます。データフレームの読み取りの出力は、write_dataframe呼び出し中のデータセットの書き込みモードに従って評価され、以前の書き込みモードは無視されます。.localCheckpoint(eager=True)を呼び出すと、その時点でデータが読み込まれ、出力書き込みモードが評価され、再計算されることはありません。

大規模なデータセットの結合にインクリメンタルトランスフォームを活用する

ユーザーが提出した Ordersと完了した Deliveriesという2つのテーブルがあるとしましょう - そして、商品が配達されるまでの時間を計算する DeliveryDurationというテーブルを計算したいと思います。OrdersDeliveriesの両テーブルには新しい行が追加されるだけで、行が変更されることはないにもかかわらず、2つのインクリメンタルデータセット間での単純な結合は機能しません。例えば、Ordersテーブルには、まだ Deliveriesテーブルに存在しない orderIdsが含まれている可能性があります。

Orders:                               Deliveries:
+---------+---------------+           +---------+--------------+           +---------+------------------+
| orderId | submittedDate |           | orderId | deliveryDate |           | orderId | deliveryDuration |
+---------+---------------+           +---------+--------------+   ---->   +---------+------------------+
| 1001    | 2019-08-21    |  join on  | 1001    | 2019-08-23   |           | 1001    | 2                |
+---------+---------------+  orderId  +---------+--------------+           +---------+------------------+
| 1002    | 2019-08-22    |           // 注文IDで結合する
+---------+---------------+
| 1003    | 2019-08-23    |
+---------+---------------+

orderIdOrdersテーブルとDeliveriesテーブルの両方で厳密に増加していると仮定すると、deliveryDurationを計算した最後のorderIdmaxComputedOrderId)を調べて、OrdersテーブルとDeliveriesテーブルからorderIdmaxComputedOrderIdよりも大きい行だけを取得することができます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F @incremental(snapshot_inputs=['orders', 'deliveries']) @transform( orders=Input('/example/Orders'), deliveries=Input('/example/Deliveries'), delivery_duration=Output('/example/New_Delivery_Date') ) def compute_delivery_duration(orders, deliveries, delivery_duration): def to_fields(datatype, names, nullable=True): # フィールド生成関数 return [T.StructField(n, datatype, nullable) for n in names] # deliveryDurationのためのスキーマを生成 fields = to_fields(T.IntegerType(), ['orderId', 'deliveryDuration']) # 前のバージョンのスキーマを参照できないので、スキーマを明示的に定義 maxComputedOrderId = ( delivery_duration .dataframe('previous', schema=T.StructType(fields)) .groupby() .max('orderId') .collect()[0][0] ) # 最初のイテレーションでは、maxComputedOrderIdは空です。なぜなら、delivery_durationデータセットがまだ存在しないからです。 if maxComputedOrderId == None: maxComputedOrderId = 0 ordersNotProcessed = orders.dataframe().filter(F.col('orderId') > maxComputedOrderId) deliveriesNotProcessed = deliveries.dataframe().filter(F.col('orderId') > maxComputedOrderId) newDurations = ( ordersNotProcessed .join(deliveriesNotProcessed, 'orderId', how='left') .withColumn('deliveryDuration', F.datediff(F.col('deliveryDate'), F.col('submittedDate'))) .drop(*['submittedDate', 'deliveryDate']) ) # 結果を書き込み delivery_duration.write_dataframe(newDurations)

スキーマまたはロジック変更の処理

例えば、これからインクリメンタルなデータセットに別の行を追加したいと考えてみましょう。出力に別の行を追加しても、is_incremental フラグは無効にならないため、次回の実行では新しい行が計算され、新しい行とともにデータが書き込まれ、この行は以前に書き込まれたすべての行で null になります。

しかし、以前の行に対しても行を埋めたい場合もあります。変換の semantic_version を増やすと、一度だけ非インクリメンタルに実行され、"added" の読み取りモードを使用している場合、入力には新しい行を追加して再計算するためのすべてのデータが含まれます。

変換がスナップショット入力からの歴史的データセットを作成していた場合、少し複雑になります。以前のデータは入力のスナップショットトランザクションのスタックにあるためです。この場合、Palantir の担当者に連絡してください。

この例では新しい行を追加することを考えましたが、上記の理由はあらゆる種類のロジック変更に適用されます。

ブランチ上でのインクリメンタルコードの開発

新しいブランチを作成し、それでビルドを実行すると、ビルドはインクリメンタルに実行されます。単純に、元のブランチから作成したブランチにコミットされた最後のトランザクションが、新しいブランチの最初のビルドの前のトランザクションと見なされます。

例のまとめ

データをインクリメンタルに処理する方法を見てきました:

  • 新しく追加された行を取得し、それらを処理し、出力に追加する
  • 新しく追加された行を取得し、既に出力に存在する行に基づいてそれらをフィルター処理し、出力に追加する
  • 新しく追加された行を取得し、新しい行と既に出力に存在する行に基づいて集計を計算し、出力を新しい集計統計で置き換える
  • インクリメンタルな変換を利用して大規模なデータセットを結合する

また、次の方法を探求しました:

  • インクリメンタルな変換のスキーマまたはロジック変更を処理する
  • 入力の全内容に基づく再ビルドなしにブランチ上でインクリメンタルコードを開発する

インクリメンタル Python エラー

インクリメンタルエラーを理解するためには、トランザクションデータセットビューの概念を読んで理解しておくことが、より簡単であり、時には必要です。

カタログトランザクションエラー

役立つ文脈

ジョブがインクリメンタルに実行されると、そのインクリメンタル入力データセットは未処理のトランザクション範囲のみを含み、完全なデータセットビューは含まれません。

データセットの以下のトランザクション履歴を想像してみてください:

SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
                                   |
                       最後に処理されたトランザクション

前回データセットが構築されたとき、最新のトランザクションは (3) でした。それ以降に、トランザクション (4) と (5) がコミットされたため、未処理のトランザクションの範囲は (4) から (5) になります。

データセットのビューは、トランザクションの範囲 (1) から (5) です。ビューの上にあるトランザクション(最も新しいもの)は、git との類似性から HEAD と呼ばれることがあります。git と同様に、ブランチはトランザクションへのポインターであるため、ブランチがトランザクション (5) を 指している と言います。いくつかのブランチがいくつかのトランザクションを指すことがあり、ブランチ間でトランザクション履歴が共有されることがあります:

SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5)     [develop]
                                 |  // 開発ブランチ
                                 └─> UPDATE                              [feature-branch]
                                      // 機能ブランチ

エラー: Catalog:TransactionsNotInView

ジョブがインクリメンタルに実行されるためには、ジョブの開始時に一連のチェックが行われます。 これらのチェックの1つは、未処理トランザクションの範囲が厳密にインクリメンタルであることを確認することです(つまり、追加のみのファイル変更。詳細はインクリメンタル計算の要件を参照してください)。未処理トランザクションの範囲と処理済みトランザクションの範囲のファイルを比較することで、これを行います。

ただし、ブランチのHEADが移動された場合、インクリメンタルジョブは一貫性のない状態になります。両方の範囲を比較することはもはや意味がなくなるため、エラーCatalog:TransactionNotInViewがスローされます。

以下の図は、このエラーが発生する方法を示しています。

SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3)      ─> UPDATE (4) ─> UPDATE (5)
                   |          (最後に処理された                   (ブランチの前の
                   |            トランザクション)                     HEAD、現在は孤立)
                   |
                   └─> UPDATE (6) --> UPDATE (7, ブランチの現在のHEAD)

ここで処理されるトランザクションの範囲は (1) から (5) で、現在のブランチの HEAD は (7) を指し、現在のビューはトランザクション (1)、(2)、(6)、(7) で構成されています。

これは一貫性のない状態で、全ての処理済みトランザクションがブランチの HEAD の上流にないからです:実際には (3) がそうではありません。言い換えると、前の HEAD (3) はもはや現在のビューの一部ではないため、Catalog:TransactionNotInView が投げられます。

エラー:Catalog:InconsistentBranchesOrder

投げられる可能性のある他の Catalog エラーは、最後に処理されたトランザクション(prevTransaction)がブランチ HEAD(endTransaction)よりも大きい場合の Catalog:InconsistentBranchesOrder です。これは、データセットの HEAD が前のトランザクションよりも前のトランザクションに移動された場合に発生します。

以下に、このエラーがどのように発生するかのダイアグラムを示します:

SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
                                   |                             |
                               現在のHEAD (現在のヘッド)          最後に処理されたトランザクション

エラーの修復

ブランチの HEAD は以下の 2 つの理由で変更されます:

  • ユーザーが Catalog のエンドポイントを使用して意図的にブランチの HEAD を更新した。
  • トランザクションが変換ジョブを通じてコミットされなかった。例えば、Code Workbook でブランチをマージすると、データセットも「マージ」されます。
  • ただし、Code Workbook のデータセットのトランザクションは常に SNAPSHOT なので、一貫性のない状態にはなりません。

これを修復するためには、以下のいずれかを行う必要があります:

  • セマンティックバージョンを上げるなどして、トランスフォームをスナップショットとして実行します。これにより、新しいデータセットビューが開始され、上記のインクリメンタルチェックがリセットされます。
  • 手動でブランチの HEAD を、すでに処理された範囲の下流にあるトランザクションを指すように更新します。これは、最新の処理済みトランザクションを parentRef として updateBranch2 エンドポイントを使用して行う必要があります。ただし、このエンドポイントの使用は経験豊富なユーザーにのみ推奨します。