注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Foundry を Kafka に接続して、Kafka キューから Foundry ストリームにリアルタイムでデータを読み込みます。
機能 | ステータス |
---|---|
探索 | 🟢 一般利用可能 |
ストリーミング同期 | 🟢 一般利用可能 |
ストリーミングエクスポート | 🟢 一般利用可能 |
key (binary) | value (binary) |
---|---|
London | {"firstName": "John", "lastName": "Doe"} |
Paris | {"firstName": "Jean", "lastName": "DuPont"} |
Kafka コネクタはメッセージ内容を解析せず、任意のタイプのデータを Foundry に同期できます。すべての内容は解析されずに value
列の下にアップロードされます。下流のストリーミングトランスフォーム(たとえば、Pipeline Builder の parse_json
)を使用してデータを解析してください。key
列にはメッセージと共に Kafka に記録されたキーが表示されます。メッセージにキーが含まれていない場合、その値は null
になります。
コネクタはソース Kafka トピックのパーティション数に関わらず、常に単一のコンシューマースレッドを使用します。
ストリーミング同期は一貫した長時間実行のジョブを目的としています。ストリーミング同期の中断は、期待される結果によっては停止の可能性があります。
現在、ストリーミング同期には次の制限があります:
パフォーマンス、帯域幅、および可用性を向上させるために、エージェント接続でストリーミング同期を実行することをお勧めします。
Foundry での コネクタの設定 についてさらに学びましょう。
コネクタタイプのページに Kafka が表示されない場合は、Palantir サポートに連絡してアクセスを有効にしてください。
パラメーター | 必須か? | デフォルト | 説明 |
---|---|---|---|
ブートストラップサーバー | はい | いいえ | Kafka ブローカーサーバーを HOST:PORT の形式で 1 行ごとに追加します。 |
Kafka 接続を認証するための資格情報方法を選択します: SSL、ユーザー名/パスワード、Azure AD、Kerberos、または NONE。
設定された資格情報は次の操作を許可する必要があります:
Topic
リソース:
Read
Write
SSL 認証は、標準の Kafka SSL
および SASL_SSL
プロトコルに対応します。
SSL で認証するには、次の設定オプションを完了してください:
パラメーター | 必須か? | デフォルト | 説明 |
---|---|---|---|
エンドポイント識別アルゴリズム | はい | HTTPS | HTTPS : ブローカーのホスト名がブローカーの証明書内のホスト名と一致することを確認します。NONE : エンドポイント識別を無効にします。 |
双方向 SSL 有効 | いいえ | 無効 | 相互 TLS (mTLS) を有効にします。追加の設定が必要です。 |
SASL を使用 | いいえ | いいえ | SASL 認証を有効にします。 |
相互 TLS (双方向 SSL) を有効にする場合は、プライベートキーパスワード を追加してください:
パラメーター | 必須か? | デフォルト | 説明 |
---|---|---|---|
SSL キーパスワード | はい | いいえ | プライベートキーを復号するために必要なパスワード。 |
SASL 認証を有効にする場合は、追加の設定を完了してください:
パラメーター | 必須か? | デフォルト | 説明 |
---|---|---|---|
SASL メカニズム | いいえ | いいえ | 資格情報を暗号化するアルゴリズムを選択します。 |
saslJaasConfigUsername | はい | いいえ | ユーザー名 |
SASL JAAS 設定パスワード | はい | いいえ | パスワード |
SASL クライアントコールバックハンドラークラス | はい | いいえ | SASL クライアントのデフォルトのコールバックハンドラーを表示します。SASL コールバックハンドラーについてさらに詳しくは Java SASL API ドキュメント ↗ を参照してください。 |
OAuth 認証は OAuth 2.0 プロトコルを使用します。現在、クライアント資格情報グラントフローのみがサポートされています。
OAuth 2.0 プロトコルを使用して認証するには、次の設定オプションを完了してください:
パラメーター | 必須か? | デフォルト | 説明 |
---|---|---|---|
クライアント ID | はい | いいえ | 認証を要求するアプリケーションの ID |
クライアントシークレット | はい | いいえ | サーバーとアプリケーション間の共有シークレット |
トークンエンドポイント URI | はい | いいえ | アクセス/ID トークンを付与するサーバーの一意のリソース識別子 (URI) |
スコープ | いいえ | いいえ | OAuth 経由で Kafka または特定の Kafka トピックに接続するには、スコープのコレクションを要求する必要がある場合があります。スコープは、認証プロバイダーで設定された任意の文字列値です。たとえば、コンシューマーは、Kafka へのアクセスレベルを決定するために認証要求で (kafka-topics-read、kafka-topics-list) のいずれかを要求する必要がある場合があります。 |
SASL 拡張 | いいえ | いいえ | たとえば Confluent Cloud などの一部の Kafka サーバーは、SASL 拡張 ↗ を設定する必要があります。これは、そのサーバープラットフォーム特有のキーと値のペアです。 |
Azure AD 認証モードは、Azure Event Hubs 用の Kafka インターフェースに適用されます。このモードには Azure AD サービスプリンシパルが必要です。サービスプリンシパルの作成方法および Event Hubs へのアクセス設定については、Azure ドキュメント ↗ を参照してください。
パラメーター | 必須か? | デフォルト |
---|---|---|
テナント ID | はい | いいえ |
クライアント ID | はい | いいえ |
クライアントシークレット | はい | いいえ |
Azure Event Hubs 用の Kafka インターフェースには、SAS トークン ↗ を使用してアクセスすることもできます。SAS トークンで認証するには、ユーザー名を $ConnectionString
(引用符なし)として、パスワードに EventHubs 接続文字列を使用してユーザー名/パスワード認証を選択します。
Kafka の標準 PLAINTEXT
プロトコルに対応します。
認証または SSL なしでコネクタを設定することは強くお勧めしません。これにより、コネクタと Kafka ブローカー間で暗号化されていないデータが送信されます。この設定は、安全なネットワーク内でのみ使用してください。
コネクタは Kafka ブローカーのホストにアクセスする必要があります。直接接続を使用する場合は、すべてのブートストラップサーバーホストに対して DNS イーグレスポリシーを作成します。
SSL および TLS のために追加のクライアントまたはサーバー証明書およびプライベートキーを設定する必要がある場合があります。
SSL 接続はサーバー証明書を検証します。通常、SSL 検証は証明書チェーンを通じて行われます。デフォルトでは、エージェントおよび直接接続ランタイムの両方がほとんどの標準証明書チェーンを信頼します。ただし、接続先のサーバーに自己署名証明書がある場合、またはホスト名検証が接続を遮断する場合、コネクタは証明書を信頼する必要があります。使用する適切な証明書については、Kafka 管理者にお問い合わせください。
Data Connection で証明書を使用する方法 についてさらに学びましょう。
ユーザーの Kafka クラスターは、サーバーとクライアントの両方が mTLS を通じて認証する必要がある場合があります。mTLS を有効にするには、次の設定を行う必要があります:
コネクタの実行タイプに基づいて、クライアントプライベートキーを設定するための手順に従います。
エージェントを介して接続する場合は、プライベートキーを追加する 方法に関する指示に従います。
直接接続する場合は、コネクタ設定ページの Configure client certificates and private key セクションにプライベートキーをアップロードします。表示されるポップアップでエイリアス kafka
を使用し、プライベートキーとクライアント証明書を追加します。
Set up a streaming sync チュートリアルで Kafka との同期の設定方法を学びましょう。
Kafka スキーマレジストリは、すべてのスキーマのバージョン管理された履歴を保持する集中型ストレージシステムとして機能します。レジストリは Avro
、Protobuf
、および JSON
スキーマとの互換性を提供し、スキーマ形式とシリアル化されたデータ間の変換を容易にするために SerDes(シリアライザー/デシリアライザー)を使用します。
Kafka スキーマレジストリを効果的に活用するには、関連する Kafka
トピックのスキーマを登録し、Schema Registry URL
をソース設定に追加する必要があります。この調整により、コネクタは生のバイトを対応するデータ型に変換できるようになります。
たとえば、標準的な抽出では、通常、Kafka から Foundry に生のバイトが取り込まれるように設定されます。
しかし、スキーマレジストリが設定されている場合、コネクタはバイトの基礎となるスキーマを識別し、それらを一流の Foundry 型に変換できます。
この機能により、面倒な型変換の必要性がなくなり、下流のパイプラインが大幅に簡素化されます。さらに、スキーマレジストリは集中型のスキーマ管理とスキーマ進化時の互換性チェックを提供するため、データの一貫性の保証が向上します。
コネクタは、Data Connection を介して外部の Kafka クラスターにストリームをエクスポートすることをサポートしています。
Kafka にエクスポートするには、まず Kafka コネクタの エクスポートを有効にする 必要があります。その後、新しいエクスポートを作成 します。
オプション | 必須か? | デフォルト | 説明 |
---|---|---|---|
Topic | はい | N/A | エクスポート先の Kafka トピック。 |
Linger milliseconds | はい | 0 | レコードを Kafka に送信する前に待機するミリ秒数。待機時間中に蓄積されたレコードは、エクスポート時にまとめてバッチ処理されます。この設定はデフォルトで 0 に設定されており、レコードは即座に送信されます。しかし、これにより Kafka インスタンスへのリクエストが増える可能性があります。 |
Key column | いいえ | 未定義 | Kafka にレコードを公開する際に Key として使用する Foundry ストリームの列。null キーはサポートされていないため、エクスポートされるストリーム内のすべてのレコードに対して選択された列が入力されていることを確認してください。 |
Value column | いいえ | 未定義 | エクスポートする Foundry ストリームの列。指定しない |
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
type: streaming-kafka-export-task # タイプ:ストリーミング-カフカ-エクスポート-タスク config: # 設定 maxParallelism: 1 # 最大並列性:1 topic: test-topic # トピック:テストトピック clientId: client-id # クライアントID:クライアント-id keyColumn: key # キーカラム:キー valueColumn: value # 値カラム:値 batchOptions: # バッチオプション: maxMilliseconds: 5000 # 最大ミリ秒:5000 maxRecords: 1000 # 最大レコード:1000 transforms: # 変換: transformType: test # 変換タイプ:テスト userCodeMavenCoords: [] # ユーザーコードMaven座標:[] useDirectReaders: false # ダイレクトリーダーを使用する:偽
ここではYAML形式でKafkaストリーミングエクスポートタスクの設定が定義されています。各設定はキーと値のペアで表され、その意味はコメントに日本語で説明されています。 エクスポートタスクを設定した後、右上の角にある Save を選択してください。