データ統合ストリーミングソースKafka

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Kafka

Foundry を Kafka に接続して、Kafka キューから Foundry ストリームにリアルタイムでデータを読み込みます。

サポートされている機能

機能ステータス
ストリーミング同期🟢 一般提供
データコネクションエクスポート🟢 一般提供
探索🟢 一般提供

データモデル

キー(バイナリ)値(バイナリ)
London{"firstName": "John", "lastName": "Doe"}
Paris{"firstName": "Jean", "lastName": "DuPont"}

Kafka コネクタはメッセージの内容を解析せず、Foundry に任意のタイプのデータを同期させることができます。すべての内容は、value 行の下にアップロードされ、未解析のままです。データを解析するには、下流のストリーミング変換(例:Pipeline Builderparse_json)を使用します。key 行には、メッセージとともに Kafka に記録されたキーが表示されます。メッセージにキーが含まれていない場合、値は null になります。

パフォーマンスと制限

コネクタは、ソースの Kafka トピックのパーティション数に関係なく、常に単一のコンシューマースレッドを使用します。

ストリーミング同期は、一貫性のある長期間実行されるジョブを意図しています。ストリーミング同期が中断されると、期待される結果に応じて潜在的なアウトエージが発生する可能性があります。

現在、ストリーミング同期には以下の制限があります。

  • エージェント接続からのジョブは、アップグレードを反映するために、メンテナンスウィンドウ中に再起動されます(通常は週に 1 回)。想定されるダウンタイムは 5 分未満です。
  • 直接接続からのジョブは、48 時間ごとに少なくとも一度再起動されます。想定されるダウンタイムは、シングルデジットの分数です(リソースの可用性がジョブの即時再開を許可する場合)。

ストリーミング同期の実行には、エージェント接続を使用することで、パフォーマンス、帯域幅、可用性が向上することをお勧めします。

セットアップ

  1. Data Connectionアプリを開き、画面右上の**+ 新規ソース**を選択します。
  2. 利用可能なコネクタタイプから Kafka を選択します。
  3. インターネット経由で 直接接続を使用するか、 中間エージェント経由で接続を選択します。
    • Kafka コネクタのセットアップに成功し、ダウンタイムを減らすために、ソースごとに 2 つのエージェントを接続することをお勧めします。エージェントのメンテナンスウィンドウが重複しないように注意してください。
  4. 以下のセクションにある情報を使用して、コネクタの設定を続ける追加の構成プロンプトに従ってください。

Foundry でコネクタをセットアップする方法について詳しくは、こちらをご覧ください。

コネクタタイプのページで Kafka が表示されない場合は、Palantir の担当者に連絡してアクセスを有効にしてください。

URL

パラメーター必須?デフォルト説明
URLはいなしKafka ブローカーの URL を 1 行に 1 つずつ追加してください。

認証

Kafka 接続を認証するための資格情報方法を選択します:SSL、Azure AD、または NONE。

設定された資格情報は、以下の操作を許可する必要があります。

  • Topic リソース:
    • ストリーミング同期と探索のための Read
    • ストリーミングエクスポートのための Write

SSL

SSL 認証は、標準の Kafka SSL および SASL_SSLプロトコルに対応しています。

SSL で認証するには、以下の構成オプションを完了してください。

パラメーター必須?デフォルト説明
エンドポイント識別アルゴリズムはいHTTPSHTTPS:ブローカーのホスト名がブローカーの証明書のホスト名と一致することを確認します。
NONE:エンドポイント識別を無効にします。
相互 SSL 有効化いいえ無効相互 TLS (mTLS)を有効化します。追加の構成が必要です。
SASL の使用いいえいいえSASL 認証を有効にします。

相互 TLS(双方向 SSL)を有効にする場合は、プライベートキーのパスワードを追加してください。

パラメーター必須?デフォルト説明
SSL キーパスワードはいなしプライベートキーを復号化するために必要なパスワード。

SASL 認証を有効にする場合は、追加の構成を完了してください。

パラメーター必須?デフォルト説明
SASL メカニズムいいえいいえ資格情報を暗号化するアルゴリズムを選択します。
saslJaasConfigUsernameはいなしユーザー名
SASL JAAS config passwordはいなしパスワード
SASL クライアントコールバックハンドラークラスはいなしSASL クライアント用のデフォルトコールバックハンドラーを表示します。SASL コールバックハンドラーに関する詳細は、Java SASL API ドキュメントを参照してください。

Azure AD

Azure AD 認証モードは、Azure Event Hubs 用の Kafka インターフェースに適用されます。このモードでは、Azure AD サービスプリンシパルが必要です。サービスプリンシパルの作成方法と Event Hubs へのアクセス設定方法については、Azure ドキュメントを確認してください。

パラメーター必須?デフォルト
テナント IDはいなし
クライアント IDはいなし
クライアントシークレットはいなし

なし

Kafka の標準 PLAINTEXT プロトコルに対応しています。

認証や SSL なしでコネクタを設定することは強くお勧めしません。これにより、コネクタと Kafka ブローカー間で暗号化されていないデータが送信されます。この設定は、セキュアなネットワーク内でのみ使用してください。

ネットワーキング

コネクタは、KafkaブローカーのURLへのアクセスが必要です。直接接続を使用する場合は、すべてのブートストラップサーバーURLに対してDNSエグレスポリシーを作成します。

証明書とプライベートキー

SSLやTLSの追加のクライアント証明書やサーバー証明書、プライベートキーを設定する必要があります。

SSL

SSL接続はサーバー証明書を検証します。通常、SSL検証は証明書チェーンを通じて行われます。デフォルトでは、エージェントと直接接続のランタイムは、ほとんどの標準的な証明書チェーンを信頼します。しかし、接続するサーバーが自己署名証明書を持っている場合、またはホスト名検証が接続を妨げる場合、コネクタはその証明書を信頼しなければなりません。使用する正しい証明書については、Kafka管理者に連絡してください。

Data Connectionでの証明書の使用についての詳細を学びましょう。

相互 TLS (mTLS)

ユーザーのKafkaクラスタは、サーバーとクライアントの両方がmTLSを通じて認証することを求めるかもしれません。mTLSを有効にするには、以下のものを設定する必要があります:

コネクタの実行タイプに基づいて、クライアントのプライベートキーを設定するための以下の手順をフォローしてください。

エージェント用のクライアントプライベートキーの設定
  1. 下記のブートストラッパキーストアをコピーして、ホストの別の場所に保存します。ブートストラッパキーストアとトラストストアは、エージェントが再起動するたびに再生成されます。デフォルトのキーストアに行った変更はすべて、再起動時に上書きされます。

    Copied!
    1 2 $ mkdir ~/security $ cp <bootvisor_root>/var/data/processes/<bootstrapper_dir>/var/conf/keyStore.jks ~/security/
  2. Javaのkeytoolコマンドラインツールを使用して、顧客が提供したキーストアからキーをインポートし、コピーしたエージェントキーストアにインポートします。このツールがまだインストールされていない場合は、エージェントにバンドルされているJDKのbinディレクトリから見つけることができます。

    Copied!
    1 2 3 4 $ keytool -importkeystore -srckeystore <kafka_keystore.jks> -destkeystore ~/security/keyStore.jks Importing keystore kafka_keystore.jks to keyStore.jks... Enter destination keystore password: keystore Enter source keystore password:
    • keytool -listコマンドを使用して、キーがコピーされたキーストアに追加されたかどうかを確認することができます:

      Copied!
      1 2 3 4 5 6 7 8 9 10 11 $ keytool -list -keystore ~/security/keyStore.jks Enter keystore password: Keystore type: jks Keystore provider: SUN Your keystore contains 2 entries kafka, 15-Dec-2022, PrivateKeyEntry, Certificate fingerprint (SHA-256): A5:B5:2F:1B:39:D3:DA:47:8B:6E:6A:DA:72:4B:0B:43:C7:2C:89:CD:0D:9D:03:B2:3F:35:7A:D4:7C:D3:3D:51 server, 15-Dec-2022, PrivateKeyEntry, Certificate fingerprint (SHA-256): DB:82:66:E8:09:43:30:9D:EF:0A:41:63:72:0C:2A:8D:F0:8A:C1:25:F7:89:B1:A3:6E:6F:C6:C5:2C:17:CB:B2
  3. インポートされたキーのパスワードを更新するためにkeytool -keypasswdコマンドを使用します。エージェントキーストアは、キーとキーストアのパスワードが一致することを要求します。

    Copied!
    1 2 $ keytool -keypasswd -alias kafka -new keystore -keystore ~/security/keyStore.jks Enter keystore password:
  4. Data Connectionで、コネクタエージェントに移動し、エージェント設定タブを開きます。設定の管理セクションで、詳細を選択し、エージェントタブを選択し、keyStoreを新しくコピーしたキーストアを指すように更新します。次に、keyStorePasswordを追加し、適切な値(デフォルトではkeystore)を設定します。

    Copied!
    1 2 3 4 5 security: keyStore: ~/security/keyStore.jks keyStorePassword: keystore trustStore: var/conf/trustStore.jks ...
  5. 最後に、エクスプローラータブを選択し、keyStorePathkeyStorePasswordの両方を更新します。新しい設定を保存します。

    Copied!
    1 2 3 4 5 security: keyStorePath: ~/security/keyStore.jks keyStorePassword: keystore trustStorePath: var/conf/trustStore.jks ...
  6. エージェントを再起動します。

エージェントタブの設定時にフィールド名はkeyStoreとなり、エクスプローラータブではkeyStorePathとなることに注意してください。ブートストラッパの設定には変更は必要ありません。

直接接続のクライアントプライベートキーの設定

コネクタ設定ページのクライアント証明書とプライベートキーの設定セクションで、プライベートキーをアップロードします。表示されるポップアップでエイリアスkafkaを使用し、プライベートキーとクライアント証明書を追加します。

Kafkaからデータを同期

ストリーミング同期の設定チュートリアルで、Kafkaとの同期の設定方法を学びましょう。

スキーマレジストリの統合

Kafkaスキーマレジストリは、すべてのスキーマのバージョン履歴を維持する中央集権型のストレージシステムとして機能します。レジストリは AvroProtobuf、および JSON スキーマと互換性があり、SerDes(シリアライザ/デシリアライザ)を使用してスキーマ形式とシリアライズされたデータ間の変換を容易にします。

Kafkaスキーマレジストリを効果的に活用するには、関連する Kafka トピックのスキーマを登録し、 スキーマレジストリURL をソース設定に追加する必要があります。この調整により、コネクタは生のバイトを対応するデータ型に変換する能力を得ます。

たとえば、標準的な抽出では、通常、Kafkaからの生のバイトをFoundryに取り込むことになります。これは以下のように描かれています:

標準的なバイナリKafka抽出

しかし、スキーマレジストリが設定されていると、コネクタはバイトの基本スキーマを識別し、それらを一等のFoundry型に変換することができます。以下に示すように:

Avro Kafka抽出

この機能は、面倒な型変換の必要性を排除することで、下流のパイプラインを大幅に合理化することができます。さらに、スキーマレジストリが中央集権型のスキーマ管理と、スキーマが進化するにつれての互換性チェックを提供するため、データの一貫性保証を向上させます。

Kafkaへのデータエクスポート

コネクタは、Data Connectionを介して外部Kafkaクラスタにストリームをエクスポートすることをサポートしています。

Kafkaへのエクスポートを開始するには、まずKafkaコネクタのエクスポートを有効にします。次に、新しいエクスポートを作成します

エクスポート設定オプション

オプション必須?デフォルト説明
TopicはいN/AエクスポートしたいKafkaトピック。
Linger millisecondsはい0Kafkaにレコードを送信する前に待つミリ秒数。待機時間中に累積したレコードは、エクスポート時にバッチ化されます。この設定はデフォルトで0に設定されており、レコードはすぐに送信されます。ただし、このデフォルト設定では、Kafkaインスタンスへのリクエストが増える可能性があります。
Key columnいいえ未定義Kafkaにレコードを公開する際にキーとして使用したいFoundryストリームの行。Nullキーはサポートされていません。選択した行がエクスポートされるストリームのすべてのレコードに対して入力されていることを確認してください。
Value columnいいえ未定義エクスポートしたいFoundryストリームの行。指定しない場合、行からのすべてのフィールドがバイトとしてシリアライズされ、メッセージの本文としてKafkaにエクスポートされます。
Enable Base64 Decodeいいえ無効Foundryストリームのバイナリデータは、内部的に保存されるときにBase64でエンコードされます。有効にすると、このフラグにより、エクスポートする前にバイナリデータがデコードされます。これは、 Key columnValue column の両方が指定されている場合にのみ有効にできます。

エクスポートタスク設定(レガシー)

エクスポートタスクを通じてKafkaにエクスポートすることはお勧めしません。可能ならば、既存のエクスポートタスクは私たちが推奨するエクスポート機能を使用して移行すべきです。以下のドキュメンテーションは歴史的な参照のために提供されています。

エクスポートタスクを使用してデータのエクスポートを開始するには、エクスポートしたいKafkaコネクタを含むプロジェクトフォルダーに移動します。コネクタ名を右クリックし、 Create Data Connection Task を選択します。

Data Connectionビューの左側のパネルで:

  1. Source 名が使用したいKafkaコネクタと一致していることを確認します。
  2. Streaming dataset タイプの Input として dataset を追加します。入力データセットはエクスポートされるFoundryデータセットです。
  3. Streaming export タイプの Output として dataset を追加します。出力データセットはタスクの実行、スケジューリング、監視に使用されます。
  4. 最後に、テキストフィールドにYAMLブロックを追加して、タスク設定を定義します。

YAMLを作成するときに以下のオプションを使用します:

オプション必須?デフォルト説明
maxParallelismはいいいえデータをエクスポートするために使用できる並列スレッドの最大数。実際のスレッド数は、入力Foundryストリームのパーティション数によって決まります( maxParallelism よりも少ない場合)。
topicはいいいえデータがプッシュされるトピックの名前。
clientIdはいエクスポートタスクに使用する識別子。識別子はKafka client.id にマップされます。詳細については、Kafkaドキュメンテーションを参照してください。
batchOptionsはいいいえ下記の batchOptions 設定を参照してください。
keyColumnいいえいいえ入力ストリーミングデータセットの行の名前。この行の値はエクスポートメッセージのキーとして使用されます。このプロパティを省略すると、キーの値として null がエクスポートされます。
valueColumnいいえいいえ入力ストリーミングデータセットの行の名前。この行の値はエクスポートメッセージの値として使用されます。このプロパティを省略すると、すべての行(文字列化されたJSONオブジェクトとして)が value フィールドの下にエクスポートされます。
enableIdempotenceいいえtrue詳細については、Kafkaドキュメンテーションを参照してください。
useDirectReadersはいいいえ常に false に設定します。例に従って設定します。
transformsはいいいえ下記の例の設定を参照してください。

以下のオプションを使用して batchOptions を設定します:

オプション必須?デフォルト説明
maxMillisecondsはいいいえ出力トピックに書き込む前に待つ最大期間(ミリ秒)。この値を下げると遅延が減り、増やすとネットワークオーバーヘッド(リクエスト数)が減ります。この値は、バッチが maxRecords の制限に先に達した場合を除き、使用されます。
maxRecordsはいいいえ出力トピックに書き込む前にバッファリングするメッセージの最大数。この値を下げると遅延が減り、増やすとネットワークオーバーヘッド(リクエスト数)が減ります。この値は、バッチが maxMilliseconds の制限に先に達した場合を除き、使用されます。

以下にエクスポートタスク設定の例を示します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type: streaming-kafka-export-task # タスクのタイプを指定します。ここでは"streaming-kafka-export-task"を指定します。 config: # 設定パラメータを指定します。 maxParallelism: 1 # 最大並列処理数を指定します。ここでは1を指定します。 topic: test-topic # Kafkaのトピックを指定します。ここでは"test-topic"を指定します。 clientId: client-id # クライアントIDを指定します。ここでは"client-id"を指定します。 keyColumn: key # キーカラムを指定します。ここでは"key"を指定します。 valueColumn: value # バリューカラムを指定します。ここでは"value"を指定します。 batchOptions: # バッチオプションを指定します。 maxMilliseconds: 5000 # 最大ミリ秒数を指定します。ここでは5000を指定します。 maxRecords: 1000 # 最大レコード数を指定します。ここでは1000を指定します。 transforms: # 変換を指定します。 transformType: test # 変換タイプを指定します。ここでは"test"を指定します。 userCodeMavenCoords: [] # ユーザーコードのMaven座標を指定します。ここでは空配列を指定します。 useDirectReaders: false # 直接のリーダーの使用を指定します。ここでは"false"を指定します。

エクスポートタスクを設定した後、右上の角にある保存を選択してください。