注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

インクリメンタルおよびバッチ再インデックス化

以下のドキュメンテーションは、標準的なObject Storage V2データストアに特化したものです。Object Storage V1(Phonograph)のインデックス化動作に関する情報は、OSv1ドキュメンテーションをご参照ください。

インクリメンタルインデックス化(デフォルト)

標準的なObject Storage V2データストアは、データソース内の新しいトランザクションごとにデータの差異を自動的に計算し、新しいデータの更新のみをインクリメンタルにインデックス化します。Funnelパイプラインは、すべてのオブジェクトタイプでデフォルトでインクリメンタルインデックス化を使用します。インクリメンタルインデックス化により、Funnelパイプラインはすべてのデータを再インデックス化するよりも迅速に実行できます。

例えば、100行のデータソースにバックアップされた100個のオブジェクトインスタンスを持つオブジェクトタイプがあるとします。新しいデータの更新でその10行が変更された場合、入力データソースのトランザクションタイプに関係なくすべての100個のオブジェクトインスタンスを再インデックス化するのではなく、Funnelパイプラインは、変更された10行のみを含む新しい APPEND トランザクションを変更ログデータセットに作成します。

バッチインデックス化(特殊なケース)

Funnelパイプラインは、次の2つのタイプのケースでバッチインデックス化(すべてのオブジェクトインスタンスが再インデックス化される)を使用します:

  • 入力データソースの行の一定の割合以上が同一のトランザクションで変更されると、変更ログを計算してインクリメンタルにインデックス化するよりも、再インデックス化が計算上安価で迅速になることがあります。デフォルトの閾値は、同一のトランザクションで行が80%変更されるように設定されています。
  • オブジェクトタイプスキーマの特定の変更は、Funnel置換パイプラインを必要とし、これは背後で全く新しいFunnelパイプライン(OSv2インデックスを含む)を作成します。

インクリメンタルデータセットのインデックス化

Object Storage V2は、インクリメンタルデータセットにバックアップされたオブジェクトタイプを同期する際に、「最新のトランザクションが勝つ」戦略を使用します。データセットが同じプライマリキーを持つ複数の行を含んでいる場合、最新のトランザクションの行のデータがオントロジーに存在します。単一のトランザクション内でプライマリキーが重複することはありません。この動作は、ユーザーの編集とデータソースの更新の競合の取り扱い方法とは関係ありません。

APPENDトランザクションを通じて行に更新が行われるインクリメンタルデータセット、通常は変更ログデータセットと呼ばれるものを考えてみましょう。新しいバージョンの同じデータは、更新された値を持つがプライマリキーは同じ新しい行として、一つのトランザクションでデータセットに追加されます。変更ログデータセットには、Boolean型の is_deleted 行も存在することがあります。 is_deleted 行の値が真である場合、その行は削除されたとみなされるべきです。

Object Storage V2は、変更ログデータセットを以下のように同期します:

  • プライマリキーが複数のトランザクションに現れる場合、最新のトランザクションの行が保持されます。
  • 各トランザクションは、プライマリキーごとに最大1行を含む必要があります。
  • データセットがObject Storage V1の変更ログである場合、Object Storage V2は is_deleted 行を尊重しますが、順序付け行は尊重しません。

おそらく、変更ログデータセットに対してインクリメンタルウィンドウ変換を実行し、各トランザクションが最大でプライマリキーごとに1行を含むようにする必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 from pyspark.sql.window import Window from pyspark.sql import functions as F # Window関数を定義します。データを'primary_key'で分割し、'ordering'列の降順に並べ替えます。 ordering_window = Window().partitionBy('primary_key').orderBy(F.col('ordering').desc()) # Window関数を使用して、各パーティション内で行の順位(ランク)を計算し、新しい列'rank'を作成します。 df = df.withColumn('rank', F.row_number().over(ordering_window)) # フィルタリングを行います。'rank'が1(つまり、各パーティション内で最も値が大きい行)であり、かつ'is_deleted'がFalse(削除されていない)である行だけを残します。 df = df.filter((F.col('rank') == 1) & ~F.col('is_deleted'))

ストリームインデックス作成

Object Storage V2 のストリーミングでは、「最新の更新が勝つ」戦略を使用し、すべてのストリームがチェンジログストリームのように扱われます。ユーザーのイベントがソースから順序を持たずに出てくると、ユーザーのオントロジーに誤ったデータが入ります。ユーザーが入力ストリームの順序を保証できる場合、Object Storage V2 のストリーミングはユーザーの更新を同じ順序で処理します。

Object Storage V2 のストリーミングを使用する前に以下の事項を考慮してください:

  • Object Storage V2 のストリーミングは is_deleted 行をサポートしていません。代わりに、オントロジーに is_deleted 行を作成し、ユーザーのすべてのオブジェクトセットに is_deleted=false フィルター処理するを追加してください。
  • 一時的な制限のため、Object Storage V2 のストリーミングオブジェクトセットはサイズが小さい方が良い(1000万オブジェクト未満)。
  • Object Storage V2 のストリーミングは Actions をサポートしていません。