データ接続と統合Available connectorsKafka

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Kafka

Foundry を Kafka に接続して、Kafka キューから Foundry ストリームにリアルタイムでデータを読み込みます。

対応機能

機能ステータス
探索🟢 一般利用可能
ストリーミング同期🟢 一般利用可能
ストリーミングエクスポート🟢 一般利用可能

データモデル

key (binary)value (binary)
London{"firstName": "John", "lastName": "Doe"}
Paris{"firstName": "Jean", "lastName": "DuPont"}

Kafka コネクタはメッセージ内容を解析せず、任意のタイプのデータを Foundry に同期できます。すべての内容は解析されずに value 列の下にアップロードされます。下流のストリーミングトランスフォーム(たとえば、Pipeline Builderparse_json)を使用してデータを解析してください。key 列にはメッセージと共に Kafka に記録されたキーが表示されます。メッセージにキーが含まれていない場合、その値は null になります。

パフォーマンスと制限

コネクタはソース Kafka トピックのパーティション数に関わらず、常に単一のコンシューマースレッドを使用します。

ストリーミング同期は一貫した長時間実行のジョブを目的としています。ストリーミング同期の中断は、期待される結果によっては停止の可能性があります。

現在、ストリーミング同期には次の制限があります:

  • エージェント接続からのジョブは、メンテナンスウィンドウ中(通常は週に 1 回)に再起動してアップグレードを適用します。予想されるダウンタイムは 5 分未満です。
  • 直接接続からのジョブは、少なくとも 48 時間ごとに再起動します。予想されるダウンタイムは一桁の分数です(リソースの利用可能性がジョブの即時再起動を許可する場合)。

パフォーマンス、帯域幅、および可用性を向上させるために、エージェント接続でストリーミング同期を実行することをお勧めします。

設定

  1. Data Connection アプリケーションを開き、画面右上の + New Source を選択します。
  2. 利用可能なコネクタタイプから Kafka を選択します。
  3. インターネットを介して 直接接続 するか、仲介エージェントを通じて接続 するかを選択します。
    • Kafka コネクタを正常に設定し、ダウンタイムを減らすために、ソースごとに 2 つのエージェントを使用することをお勧めします。エージェントのメンテナンスウィンドウが重ならないようにしてください。
  4. 以下のセクションの情報を使用して、コネクタのセットアップを続行するための追加設定プロンプトに従います。

Foundry での コネクタの設定 についてさらに学びましょう。

コネクタタイプのページに Kafka が表示されない場合は、Palantir サポートに連絡してアクセスを有効にしてください。

ブートストラップサーバー

パラメーター必須か?デフォルト説明
ブートストラップサーバーはいいいえKafka ブローカーサーバーを HOST:PORT の形式で 1 行ごとに追加します。

認証

Kafka 接続を認証するための資格情報方法を選択します: SSL、ユーザー名/パスワード、Azure AD、Kerberos、または NONE。

設定された資格情報は次の操作を許可する必要があります:

  • Topic リソース:
    • ストリーミング同期および探索のための Read
    • ストリーミングエクスポートのための Write

SSL

SSL 認証は、標準の Kafka SSL および SASL_SSL プロトコルに対応します。

SSL で認証するには、次の設定オプションを完了してください:

パラメーター必須か?デフォルト説明
エンドポイント識別アルゴリズムはいHTTPSHTTPS: ブローカーのホスト名がブローカーの証明書内のホスト名と一致することを確認します。
NONE: エンドポイント識別を無効にします。
双方向 SSL 有効いいえ無効相互 TLS (mTLS) を有効にします。追加の設定が必要です。
SASL を使用いいえいいえSASL 認証を有効にします。

相互 TLS (双方向 SSL) を有効にする場合は、プライベートキーパスワード を追加してください:

パラメーター必須か?デフォルト説明
SSL キーパスワードはいいいえプライベートキーを復号するために必要なパスワード。

SASL 認証を有効にする場合は、追加の設定を完了してください:

パラメーター必須か?デフォルト説明
SASL メカニズムいいえいいえ資格情報を暗号化するアルゴリズムを選択します。
saslJaasConfigUsernameはいいいえユーザー名
SASL JAAS 設定パスワードはいいいえパスワード
SASL クライアントコールバックハンドラークラスはいいいえSASL クライアントのデフォルトのコールバックハンドラーを表示します。SASL コールバックハンドラーについてさらに詳しくは Java SASL API ドキュメント ↗ を参照してください。

OAuth 2.0

OAuth 認証は OAuth 2.0 プロトコルを使用します。現在、クライアント資格情報グラントフローのみがサポートされています。

OAuth 2.0 プロトコルを使用して認証するには、次の設定オプションを完了してください:

パラメーター必須か?デフォルト説明
クライアント IDはいいいえ認証を要求するアプリケーションの ID
クライアントシークレットはいいいえサーバーとアプリケーション間の共有シークレット
トークンエンドポイント URIはいいいえアクセス/ID トークンを付与するサーバーの一意のリソース識別子 (URI)
スコープいいえいいえOAuth 経由で Kafka または特定の Kafka トピックに接続するには、スコープのコレクションを要求する必要がある場合があります。スコープは、認証プロバイダーで設定された任意の文字列値です。たとえば、コンシューマーは、Kafka へのアクセスレベルを決定するために認証要求で (kafka-topics-read、kafka-topics-list) のいずれかを要求する必要がある場合があります。
SASL 拡張いいえいいえたとえば Confluent Cloud などの一部の Kafka サーバーは、SASL 拡張 ↗ を設定する必要があります。これは、そのサーバープラットフォーム特有のキーと値のペアです。

Azure AD

Azure AD 認証モードは、Azure Event Hubs 用の Kafka インターフェースに適用されます。このモードには Azure AD サービスプリンシパルが必要です。サービスプリンシパルの作成方法および Event Hubs へのアクセス設定については、Azure ドキュメント ↗ を参照してください。

パラメーター必須か?デフォルト
テナント IDはいいいえ
クライアント IDはいいいえ
クライアントシークレットはいいいえ

Azure Event Hubs 用の Kafka インターフェースには、SAS トークン ↗ を使用してアクセスすることもできます。SAS トークンで認証するには、ユーザー名を $ConnectionString(引用符なし)として、パスワードに EventHubs 接続文字列を使用してユーザー名/パスワード認証を選択します。

None

Kafka の標準 PLAINTEXT プロトコルに対応します。

認証または SSL なしでコネクタを設定することは強くお勧めしません。これにより、コネクタと Kafka ブローカー間で暗号化されていないデータが送信されます。この設定は、安全なネットワーク内でのみ使用してください。

ネットワーキング

コネクタは Kafka ブローカーのホストにアクセスする必要があります。直接接続を使用する場合は、すべてのブートストラップサーバーホストに対して DNS イーグレスポリシーを作成します。

証明書とプライベートキー

SSL および TLS のために追加のクライアントまたはサーバー証明書およびプライベートキーを設定する必要がある場合があります。

SSL

SSL 接続はサーバー証明書を検証します。通常、SSL 検証は証明書チェーンを通じて行われます。デフォルトでは、エージェントおよび直接接続ランタイムの両方がほとんどの標準証明書チェーンを信頼します。ただし、接続先のサーバーに自己署名証明書がある場合、またはホスト名検証が接続を遮断する場合、コネクタは証明書を信頼する必要があります。使用する適切な証明書については、Kafka 管理者にお問い合わせください。

Data Connection で証明書を使用する方法 についてさらに学びましょう。

相互 TLS (mTLS)

ユーザーの Kafka クラスターは、サーバーとクライアントの両方が mTLS を通じて認証する必要がある場合があります。mTLS を有効にするには、次の設定を行う必要があります:

コネクタの実行タイプに基づいて、クライアントプライベートキーを設定するための手順に従います。

エージェント用クライアントプライベートキーの設定

エージェントを介して接続する場合は、プライベートキーを追加する 方法に関する指示に従います。

直接接続用クライアントプライベートキーの設定

直接接続する場合は、コネクタ設定ページの Configure client certificates and private key セクションにプライベートキーをアップロードします。表示されるポップアップでエイリアス kafka を使用し、プライベートキーとクライアント証明書を追加します。

クライアント証明書とプライベートキーを設定するオプションを表示するインターフェース

ユーザーがエイリアス、プライベートキー、クライアント証明書を入力できるポップアップ

Kafka からデータを同期する

Set up a streaming sync チュートリアルで Kafka との同期の設定方法を学びましょう。

スキーマレジストリ統合

Kafka スキーマレジストリは、すべてのスキーマのバージョン管理された履歴を保持する集中型ストレージシステムとして機能します。レジストリは AvroProtobuf、および JSON スキーマとの互換性を提供し、スキーマ形式とシリアル化されたデータ間の変換を容易にするために SerDes(シリアライザー/デシリアライザー)を使用します。

Kafka スキーマレジストリを効果的に活用するには、関連する Kafka トピックのスキーマを登録し、Schema Registry URL をソース設定に追加する必要があります。この調整により、コネクタは生のバイトを対応するデータ型に変換できるようになります。

たとえば、標準的な抽出では、通常、Kafka から Foundry に生のバイトが取り込まれるように設定されます。

標準のバイナリ Kafka 抽出

しかし、スキーマレジストリが設定されている場合、コネクタはバイトの基礎となるスキーマを識別し、それらを一流の Foundry 型に変換できます。

Avro Kafka 抽出

この機能により、面倒な型変換の必要性がなくなり、下流のパイプラインが大幅に簡素化されます。さらに、スキーマレジストリは集中型のスキーマ管理とスキーマ進化時の互換性チェックを提供するため、データの一貫性の保証が向上します。

データを Kafka にエクスポートする

コネクタは、Data Connection を介して外部の Kafka クラスターにストリームをエクスポートすることをサポートしています。

Kafka にエクスポートするには、まず Kafka コネクタの エクスポートを有効にする 必要があります。その後、新しいエクスポートを作成 します。

エクスポート設定オプション

オプション必須か?デフォルト説明
TopicはいN/Aエクスポート先の Kafka トピック。
Linger millisecondsはい0レコードを Kafka に送信する前に待機するミリ秒数。待機時間中に蓄積されたレコードは、エクスポート時にまとめてバッチ処理されます。この設定はデフォルトで 0 に設定されており、レコードは即座に送信されます。しかし、これにより Kafka インスタンスへのリクエストが増える可能性があります。
Key columnいいえ未定義Kafka にレコードを公開する際に Key として使用する Foundry ストリームの列。null キーはサポートされていないため、エクスポートされるストリーム内のすべてのレコードに対して選択された列が入力されていることを確認してください。
Value columnいいえ未定義エクスポートする Foundry ストリームの列。指定しない
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type: streaming-kafka-export-task # タイプ:ストリーミング-カフカ-エクスポート-タスク config: # 設定 maxParallelism: 1 # 最大並列性:1 topic: test-topic # トピック:テストトピック clientId: client-id # クライアントID:クライアント-id keyColumn: key # キーカラム:キー valueColumn: value # 値カラム:値 batchOptions: # バッチオプション: maxMilliseconds: 5000 # 最大ミリ秒:5000 maxRecords: 1000 # 最大レコード:1000 transforms: # 変換: transformType: test # 変換タイプ:テスト userCodeMavenCoords: [] # ユーザーコードMaven座標:[] useDirectReaders: false # ダイレクトリーダーを使用する:偽

ここではYAML形式でKafkaストリーミングエクスポートタスクの設定が定義されています。各設定はキーと値のペアで表され、その意味はコメントに日本語で説明されています。 エクスポートタスクを設定した後、右上の角にある Save を選択してください。