注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
以下のドキュメンテーションは、標準的なObject Storage V2データストアに特化したものです。Object Storage V1(Phonograph)のインデックス化動作に関する情報は、OSv1ドキュメンテーションをご参照ください。
標準的なObject Storage V2データストアは、データソース内の新しいトランザクションごとにデータの差異を自動的に計算し、新しいデータの更新のみをインクリメンタルにインデックス化します。Funnelパイプラインは、すべてのオブジェクトタイプでデフォルトでインクリメンタルインデックス化を使用します。インクリメンタルインデックス化により、Funnelパイプラインはすべてのデータを再インデックス化するよりも迅速に実行できます。
例えば、100行のデータソースにバックアップされた100個のオブジェクトインスタンスを持つオブジェクトタイプがあるとします。新しいデータの更新でその10行が変更された場合、入力データソースのトランザクションタイプに関係なくすべての100個のオブジェクトインスタンスを再インデックス化するのではなく、Funnelパイプラインは、変更された10行のみを含む新しい APPEND
トランザクションを変更ログデータセットに作成します。
Funnelパイプラインは、次の2つのタイプのケースでバッチインデックス化(すべてのオブジェクトインスタンスが再インデックス化される)を使用します:
Object Storage V2は、インクリメンタルデータセットにバックアップされたオブジェクトタイプを同期する際に、「最新のトランザクションが勝つ」戦略を使用します。データセットが同じプライマリキーを持つ複数の行を含んでいる場合、最新のトランザクションの行のデータがオントロジーに存在します。単一のトランザクション内でプライマリキーが重複することはありません。この動作は、ユーザーの編集とデータソースの更新の競合の取り扱い方法とは関係ありません。
APPEND
トランザクションを通じて行に更新が行われるインクリメンタルデータセット、通常は変更ログデータセットと呼ばれるものを考えてみましょう。新しいバージョンの同じデータは、更新された値を持つがプライマリキーは同じ新しい行として、一つのトランザクションでデータセットに追加されます。変更ログデータセットには、Boolean型の is_deleted
行も存在することがあります。 is_deleted
行の値が真である場合、その行は削除されたとみなされるべきです。
Object Storage V2は、変更ログデータセットを以下のように同期します:
is_deleted
行を尊重しますが、順序付け行は尊重しません。おそらく、変更ログデータセットに対してインクリメンタルウィンドウ変換を実行し、各トランザクションが最大でプライマリキーごとに1行を含むようにする必要があります。
Copied!1 2 3 4 5 6 7 8 9 10 11
from pyspark.sql.window import Window from pyspark.sql import functions as F # Window関数を定義します。データを'primary_key'で分割し、'ordering'列の降順に並べ替えます。 ordering_window = Window().partitionBy('primary_key').orderBy(F.col('ordering').desc()) # Window関数を使用して、各パーティション内で行の順位(ランク)を計算し、新しい列'rank'を作成します。 df = df.withColumn('rank', F.row_number().over(ordering_window)) # フィルタリングを行います。'rank'が1(つまり、各パーティション内で最も値が大きい行)であり、かつ'is_deleted'がFalse(削除されていない)である行だけを残します。 df = df.filter((F.col('rank') == 1) & ~F.col('is_deleted'))
Object Storage V2 のストリーミングでは、「最新の更新が勝つ」戦略を使用し、すべてのストリームがチェンジログストリームのように扱われます。ユーザーのイベントがソースから順序を持たずに出てくると、ユーザーのオントロジーに誤ったデータが入ります。ユーザーが入力ストリームの順序を保証できる場合、Object Storage V2 のストリーミングはユーザーの更新を同じ順序で処理します。
Object Storage V2 のストリーミングを使用する前に以下の事項を考慮してください:
is_deleted
行をサポートしていません。代わりに、オントロジーに is_deleted
行を作成し、ユーザーのすべてのオブジェクトセットに is_deleted=false
フィルター処理するを追加してください。