データ統合パイプラインのビルドストリーミングパイプラインストリーミングキー

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

ストリーミングキー

Foundryのストリームでは、1つ以上の列をキー列として指定し、解決されたレコードを識別するためのプライマリキーを指定できます。以下のセクションでは、ストリーミングパイプラインのパーティションキーとプライマリキーの設定と維持方法について説明します。

パーティションキー

一般的に、Palantirプラットフォームのプライマリキーは、データベースレコードを一意に識別するために使用されます。ただし、Foundryストリームのパーティションキーは、同じキーを持つレコードをグループ化し、特定のデバイスに対するすべてのリーディングや特定の顧客に対するすべてのトランザクションなどが該当します。バッチパイプラインのプライマリキーとは異なり、ストリームのパーティションキーはレコードを一意に識別しないため、レコードの重複除去は行われません。

Foundryのキーは、ユーザーが特定のキーに対するすべてのレコードの順序を維持できるようにします。レコードがFoundryに入ると、内部的にKafkaに格納されます。Kafkaは、同じパーティションに書き込まれたレコードが読み書き時に順序を維持することを保証します。キーが指定されていないレコードがFoundryに送信されると、内部のKafkaトピックの任意のパーティションにラウンドロビン方式で書き込まれます。ただし、ユーザーがレコードのキーを指定すると、そのキーの専用パーティションにレコードが書き込まれ、下流で消費される際に順序が維持されます。

同様に、Flinkのストリーミングジョブは、入力ソースに設定されたパーティションキー列に基づいて順序を自動的に維持します。Flinkの変換ジョブは、1つ以上の並列パーティションで変換操作を実行することがあります。パーティションキーが設定された入力ストリームの場合、Flinkのジョブは、すべてのキー列の値が同じレコードを同じ並列オペレーターインスタンスに自動的に送信します。特に再キー設定されていない限り(Key by変換として)、パイプライン全体のパーティション化と順序は、ソース上のキー列と行の値によって決定されます。変換ロジックが列を削除したり、値を上書きしたりしても、これに影響はありません。

プライマリキー: 変更データキャプチャ(CDC)モード

Foundryストリームのプライマリキーは、リレーショナルデータベースやバッチデータセットで使用されるプライマリキーと同様の方法で機能します。ただし、ストリーミングのプライマリキーは、解決されたレコードを一意に識別する重複排除列のセットで構成され、順序指定列は指定されません。

プライマリキーは、Foundryのストリーム内のスキーマに存在するメタデータの一部です。キーは、保存されたデータの内容やストリーミングデータパイプラインや変換が適用される方法に影響を与えません。プライマリキーは、一部のFoundryのコンシューマがデータを読み取る方法を制御します。フルデータ変更ログと考えることができ、プライマリキーは、すべての変更を適用した後のデータの重複排除された現在のビューを計算する方法をコンシューマに伝えます。

現在のビューは、各キーの最も最近のストリーム化されたレコードのみを含むデータのフィルタリングされたビューです。キーの最新のレコード全体が常に保持されますが、nullが含まれていてもかまいません。削除された列が指定されており、キーの最も最近のストリーム化されたレコードがその値をtrueに設定している場合、そのレコードはフィルタリングされます。

以下の例は、プライマリキーとデータストリームを示しており、最も最近のストリーム化された行がテーブルの上部に表示されます。

プライマリキー: {Deduplication column(s): [Key], isDeletedColumn: Optional[isDeleted]}

キー削除された
キー2nullfalse
キー1thirdValtrue
キー2secondValfalse
キー1firstValfalse

この変更ログのストリームは、CDCを意識したコンシューマによって、次のような現在のビューとして読み取られます。

キー削除された
キー2nullfalse

ストリーミングデータにプライマリキーを設定する場合は、順序を維持し、重複排除されたデータビューを正しく解決するために、同じ列をパーティションキーとして設定する必要があります。ストリーミングの設定インターフェースでプライマリキーを設定すると、同じ列が自動的にパーティションキー列として追加されます。

次の2つのCDCを意識したジョブは、プライマリキーが設定されたストリーミングデータを現在のビューとして自動的に読み取ります。

  1. プライマリキーが設定されたストリームでオントロジーをハイドレートする: 新しい更新レコードにより、必要に応じて新しいオブジェクトが作成、更新、または削除されます。
  2. ストリームのアーカイブデータセットが任意のSpark変換ジョブによって読み取られる: 変換が行われる前に、ソースが重複排除されます。特に、アーカイブデータセットのほとんどの操作はSparkジョブを実行し、データセットのプレビューやContourの分析も含まれます。アーカイブデータセットを表示すると、データ自体が完全な変更ログを含んでいても、常に重複排除された現在のビューとして表示されます(データをファイルとしてダウンロードする場合も同様です)。完全な変更ログは、Sparkを実行しないストリーミングジョブによって処理されます。これには、リプレイも含まれます。

キーの伝播

Pipeline Builderは、パイプライン変換を通じてパーティションキーとプライマリキー列の進化を追跡し、有効なキーを出力ストリームのスキーマに書き込みます。変換がキー列を無効にしない場合、同じパーティショニングおよび重複排除の指示が、任意の数の下流パイプラインのシーケンスで自動的に維持されます。

キー列の名前を変更すると、キーには新しい列名が含まれるようになります。同様に、キー列を削除または上書きする変換を適用すると、その列がキーから削除されます。キー列の内容を上書きすることは、新しい順序保証や新しい重複排除戦略を示す可能性があるため、Pipeline Builderはキーから列を完全に削除します。以前に上書きされたキー列を維持したい場合は、再度Key by変換を適用する必要があります。パーティションキー列が削除または上書きによって削除された場合でも、再キー設定しない限り、パイプラインの残りの部分で同じ順序保証が維持されます。

すべてのパーティションキーまたは重複排除CDCキーが削除または上書きされると、キーは完全に削除されます。キー列の削除や上書きに注意してください。これにより、順序保証の喪失や重複排除戦略の変更など、予期しない結果が生じる可能性があります。

現在、ユーザー定義関数(UDF)に対しては、キーは決して伝播しません。関数がユーザー定義であるため、ユーザーが意図するキー伝播戦略がない場合は、推論する方法がありません。キーを伝播させるつもりであれば、UDFの後にKey by変換を適用してください。

また、状態を持つ変換に対しては、プライマリキーが伝播しません(ほとんどの変換は状態を持っていません)。

ストリーミングキーの使用

Pipeline Builderアプリケーションのストリーミングパイプラインで、Key by変換をグラフに追加します。ここで設定したキーは、入力データにあったキーを上書きし、置き換えます。

パーティションキーのみを設定する場合は、CDC modeオプションをオフにして、Key by columnsリストのみを指定します。CDCモードでなければ、他のパラメーターは必要ありませんし、使用できません。

プライマリキーとパーティションキーの両方を設定するには、CDCモードオプションをオンにします。isDeleted列がある場合は、プライマリキーが削除されたフィールドにオプションで指定します。ストリーミングのユースケースでは、オプションのプライマリキーの順序付け列パラメーターを空白のままにすることを強くお勧めします。変換は、パーティションキー列とプライマリキーの重複排除列が同じであるプライマリキーとパーティションキーの両方を設定します。プライマリキーの順序付け列パラメーターは、バッチジョブでアーカイブデータセットを消費するときにのみ影響を与え、ストリーミングが重複排除について考える方法には影響を与えません。順序付け列を指定できるようにするオプションは、バッチ変換のユーザーやバッチ専用の方法でストリームアーカイブを消費することを意図しているユーザーのために、下位互換性が利用できます。

現在のキーを確認する

以下のセクションでは、Foundryストリームでストリーミングキーのロジックを見つけて確認する方法について説明します。

パーティションキーを確認する

ストリーミングデータセットを開き、詳細タブに移動し、スキーマタブを開いてJSON形式のデータスキーマを表示します。

includeInPartitioningを検索します。これは、プライマリキーの一部である各列のfieldSchemaListの要素に表示されます。

"customMetadata": {
  "includeInPartitioning": true
  // パーティション分割に含めるかどうかを指定します。true の場合、含めます。
}

includeInParitioning がスキーマフィールドに表示されない場合、ストリームはキーが設定されておらず、データが保存または処理される際の順序が保証されません。手動でキーを追加するには、スキーマをJSONテキストとして編集し、パーティションキーの列として設定したい各スキーマフィールド(行)にカスタムメタデータ(上記の説明に従って)を挿入してください。

ストリームに既に1つ以上のパーティションキーがある場合、新しいパーティションキー列を追加すると、パーティションが増えるため、順序の保証が 弱く なります。順序は、すべてのパーティションキー列の値が同じである行に対してのみ保証されます。

デプロイ前に、パイプラインへの入力ストリームにパーティションキー列が設定されている場合、そのソースとそのすべての変換の順序がパイプライン全体で保証されます(意図的に再キー設定しない限り)。パーティションキー列は、データプレビューにキーシンボルとして表示される場合があります。

主キーを確認する

ストリーミングデータセットを開き、詳細 タブに移動してから、スキーマ タブを開き、JSON形式のデータスキーマを表示します。

primaryKey という名前のJSONプロパティが表示されます。ストリームに uniqueCol1 および uniqueCol2 という重複除去列があり、isDeleted 列が isDeletedCol である場合、スキーマは次のように表示されます。

  "primaryKey": {
    // 主キー
    "columns": [
      // ユニークな列1
      "uniqueCol1",
      // ユニークな列2
      "uniqueCol2"
    ],
    // 解決方法
    "resolution": {
      // 重複タイプ
      "type": "duplicate",
      // 重複オブジェクト
      "duplicate": {
        // 解決戦略
        "resolutionStrategy": {
          // カスタム戦略タイプ
          "type": "customStrategy",
          // カスタム戦略オブジェクト
          "customStrategy": {}
        },
        // 削除列
        "deletionColumn": "isDeletedCol"
      }
    }
  }

プライマリキーが設定されていない場合、スキーマでは null が表示されます。

"primaryKey": null // 主キー: null

主キーを手動で設定または削除するには、上記の形式でキーを指定するためにスキーマ JSON を編集するか、キーを削除するために null を使用できます。主キーを手動で設定する場合、パーティションキーの列も同じ列に設定することを強くお勧めします。

順序を保証する唯一の方法は、ストリーミングデータフロー全体にパーティションキー列を設定することです。設定すると、パーティションキー列は自動的に下流に伝播します。ストリームが1つのパーティションのみを持つように設定されていても、Flink アプリケーションが非決定的にレコードをスケールおよび処理する方法のため、必ずしも順序が保証されるわけではありません。

ストリーミングキーのベストプラクティス

パーティションキーを慎重に選択してください。効率的に分散されていないレコードの結果としてキーが負荷を人工的に増加させ、スループットを制限することがあります。順序がユーザーのユースケースに重要な場合は、メール ID、顧客 ID、または組織 ID など、順序を維持したい汎用のグループ識別子にパーティションキーを設定します。順序がユースケースに重要でない場合は、キーとして一意の ID を使用するか、ストリームにキーを全く使用しないことを選択できます。

最終ストリーミング出力の順序保証は、ストリーミングシリーズ(Kafka トピックによってバックアップされる)および変換(Flink ジョブ)の中で最も弱い保証と同じくらい強力になります。したがって、Foundry にレコードを取り込む最初のストリーミング抽出から始めて、データフロー全体に正しいパーティションキーが設定されていることを確認し、所望の順序が維持されていることを確認してください。順序保証は、Foundry に抽出しているシステムよりも強力になることはありません。たとえば、Kafka から抽出するために Kafka コネクタを使用している場合は、パーティションキー列を Kafka キー列に等しく設定し、Foundry がシステムで同等の順序保証を維持できるようにします。

さらに、データ変換の一連の操作中にパーティション列(および順序保証)を完全に変更すると、問題が発生することがあります。新しい Key by 変換を適用する前に異なる順序が保証されていた場合、変換は新しく追加されたキー列からの順序が異なるレコードを受信し、変換シリーズ中に誤った順序のままになります。