注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Foundry を Kafka に接続して、Kafka キューから Foundry ストリームにリアルタイムでデータを読み込みます。
機能 | ステータス |
---|---|
ストリーミング同期 | 🟢 一般提供 |
データコネクションエクスポート | 🟢 一般提供 |
探索 | 🟢 一般提供 |
キー(バイナリ) | 値(バイナリ) |
---|---|
London | {"firstName": "John", "lastName": "Doe"} |
Paris | {"firstName": "Jean", "lastName": "DuPont"} |
Kafka コネクタはメッセージの内容を解析せず、Foundry に任意のタイプのデータを同期させることができます。すべての内容は、value
行の下にアップロードされ、未解析のままです。データを解析するには、下流のストリーミング変換(例:Pipeline Builderのparse_json
)を使用します。key
行には、メッセージとともに Kafka に記録されたキーが表示されます。メッセージにキーが含まれていない場合、値は null
になります。
コネクタは、ソースの Kafka トピックのパーティション数に関係なく、常に単一のコンシューマースレッドを使用します。
ストリーミング同期は、一貫性のある長期間実行されるジョブを意図しています。ストリーミング同期が中断されると、期待される結果に応じて潜在的なアウトエージが発生する可能性があります。
現在、ストリーミング同期には以下の制限があります。
ストリーミング同期の実行には、エージェント接続を使用することで、パフォーマンス、帯域幅、可用性が向上することをお勧めします。
Foundry でコネクタをセットアップする方法について詳しくは、こちらをご覧ください。
コネクタタイプのページで Kafka が表示されない場合は、Palantir の担当者に連絡してアクセスを有効にしてください。
パラメーター | 必須? | デフォルト | 説明 |
---|---|---|---|
URL | はい | なし | Kafka ブローカーの URL を 1 行に 1 つずつ追加してください。 |
Kafka 接続を認証するための資格情報方法を選択します:SSL、Azure AD、または NONE。
設定された資格情報は、以下の操作を許可する必要があります。
Topic
リソース:
Read
Write
SSL 認証は、標準の Kafka SSL
および SASL_SSL
プロトコルに対応しています。
SSL で認証するには、以下の構成オプションを完了してください。
パラメーター | 必須? | デフォルト | 説明 |
---|---|---|---|
エンドポイント識別アルゴリズム | はい | HTTPS | HTTPS :ブローカーのホスト名がブローカーの証明書のホスト名と一致することを確認します。NONE :エンドポイント識別を無効にします。 |
相互 SSL 有効化 | いいえ | 無効 | 相互 TLS (mTLS)を有効化します。追加の構成が必要です。 |
SASL の使用 | いいえ | いいえ | SASL 認証を有効にします。 |
相互 TLS(双方向 SSL)を有効にする場合は、プライベートキーのパスワードを追加してください。
パラメーター | 必須? | デフォルト | 説明 |
---|---|---|---|
SSL キーパスワード | はい | なし | プライベートキーを復号化するために必要なパスワード。 |
SASL 認証を有効にする場合は、追加の構成を完了してください。
パラメーター | 必須? | デフォルト | 説明 |
---|---|---|---|
SASL メカニズム | いいえ | いいえ | 資格情報を暗号化するアルゴリズムを選択します。 |
saslJaasConfigUsername | はい | なし | ユーザー名 |
SASL JAAS config password | はい | なし | パスワード |
SASL クライアントコールバックハンドラークラス | はい | なし | SASL クライアント用のデフォルトコールバックハンドラーを表示します。SASL コールバックハンドラーに関する詳細は、Java SASL API ドキュメントを参照してください。 |
Azure AD 認証モードは、Azure Event Hubs 用の Kafka インターフェースに適用されます。このモードでは、Azure AD サービスプリンシパルが必要です。サービスプリンシパルの作成方法と Event Hubs へのアクセス設定方法については、Azure ドキュメントを確認してください。
パラメーター | 必須? | デフォルト |
---|---|---|
テナント ID | はい | なし |
クライアント ID | はい | なし |
クライアントシークレット | はい | なし |
Kafka の標準 PLAINTEXT
プロトコルに対応しています。
認証や SSL なしでコネクタを設定することは強くお勧めしません。これにより、コネクタと Kafka ブローカー間で暗号化されていないデータが送信されます。この設定は、セキュアなネットワーク内でのみ使用してください。
コネクタは、KafkaブローカーのURLへのアクセスが必要です。直接接続を使用する場合は、すべてのブートストラップサーバーURLに対してDNSエグレスポリシーを作成します。
SSLやTLSの追加のクライアント証明書やサーバー証明書、プライベートキーを設定する必要があります。
SSL接続はサーバー証明書を検証します。通常、SSL検証は証明書チェーンを通じて行われます。デフォルトでは、エージェントと直接接続のランタイムは、ほとんどの標準的な証明書チェーンを信頼します。しかし、接続するサーバーが自己署名証明書を持っている場合、またはホスト名検証が接続を妨げる場合、コネクタはその証明書を信頼しなければなりません。使用する正しい証明書については、Kafka管理者に連絡してください。
Data Connectionでの証明書の使用についての詳細を学びましょう。
ユーザーのKafkaクラスタは、サーバーとクライアントの両方がmTLSを通じて認証することを求めるかもしれません。mTLSを有効にするには、以下のものを設定する必要があります:
コネクタの実行タイプに基づいて、クライアントのプライベートキーを設定するための以下の手順をフォローしてください。
下記のブートストラッパキーストアをコピーして、ホストの別の場所に保存します。ブートストラッパキーストアとトラストストアは、エージェントが再起動するたびに再生成されます。デフォルトのキーストアに行った変更はすべて、再起動時に上書きされます。
Copied!1 2
$ mkdir ~/security $ cp <bootvisor_root>/var/data/processes/<bootstrapper_dir>/var/conf/keyStore.jks ~/security/
Javaのkeytool
コマンドラインツールを使用して、顧客が提供したキーストアからキーをインポートし、コピーしたエージェントキーストアにインポートします。このツールがまだインストールされていない場合は、エージェントにバンドルされているJDKのbin
ディレクトリから見つけることができます。
Copied!1 2 3 4
$ keytool -importkeystore -srckeystore <kafka_keystore.jks> -destkeystore ~/security/keyStore.jks Importing keystore kafka_keystore.jks to keyStore.jks... Enter destination keystore password: keystore Enter source keystore password:
keytool -list
コマンドを使用して、キーがコピーされたキーストアに追加されたかどうかを確認することができます:
Copied!1 2 3 4 5 6 7 8 9 10 11
$ keytool -list -keystore ~/security/keyStore.jks Enter keystore password: Keystore type: jks Keystore provider: SUN Your keystore contains 2 entries kafka, 15-Dec-2022, PrivateKeyEntry, Certificate fingerprint (SHA-256): A5:B5:2F:1B:39:D3:DA:47:8B:6E:6A:DA:72:4B:0B:43:C7:2C:89:CD:0D:9D:03:B2:3F:35:7A:D4:7C:D3:3D:51 server, 15-Dec-2022, PrivateKeyEntry, Certificate fingerprint (SHA-256): DB:82:66:E8:09:43:30:9D:EF:0A:41:63:72:0C:2A:8D:F0:8A:C1:25:F7:89:B1:A3:6E:6F:C6:C5:2C:17:CB:B2
インポートされたキーのパスワードを更新するためにkeytool -keypasswd
コマンドを使用します。エージェントキーストアは、キーとキーストアのパスワードが一致することを要求します。
Copied!1 2
$ keytool -keypasswd -alias kafka -new keystore -keystore ~/security/keyStore.jks Enter keystore password:
Data Connectionで、コネクタエージェントに移動し、エージェント設定タブを開きます。設定の管理セクションで、詳細を選択し、エージェントタブを選択し、keyStore
を新しくコピーしたキーストアを指すように更新します。次に、keyStorePassword
を追加し、適切な値(デフォルトではkeystore
)を設定します。
Copied!1 2 3 4 5
security: keyStore: ~/security/keyStore.jks keyStorePassword: keystore trustStore: var/conf/trustStore.jks ...
最後に、エクスプローラータブを選択し、keyStorePath
とkeyStorePassword
の両方を更新します。新しい設定を保存します。
Copied!1 2 3 4 5
security: keyStorePath: ~/security/keyStore.jks keyStorePassword: keystore trustStorePath: var/conf/trustStore.jks ...
エージェントを再起動します。
エージェントタブの設定時にフィールド名はkeyStore
となり、エクスプローラータブではkeyStorePath
となることに注意してください。ブートストラッパの設定には変更は必要ありません。
コネクタ設定ページのクライアント証明書とプライベートキーの設定セクションで、プライベートキーをアップロードします。表示されるポップアップでエイリアスkafka
を使用し、プライベートキーとクライアント証明書を追加します。
ストリーミング同期の設定チュートリアルで、Kafkaとの同期の設定方法を学びましょう。
Kafkaスキーマレジストリは、すべてのスキーマのバージョン履歴を維持する中央集権型のストレージシステムとして機能します。レジストリは Avro
、 Protobuf
、および JSON
スキーマと互換性があり、SerDes(シリアライザ/デシリアライザ)を使用してスキーマ形式とシリアライズされたデータ間の変換を容易にします。
Kafkaスキーマレジストリを効果的に活用するには、関連する Kafka
トピックのスキーマを登録し、 スキーマレジストリURL
をソース設定に追加する必要があります。この調整により、コネクタは生のバイトを対応するデータ型に変換する能力を得ます。
たとえば、標準的な抽出では、通常、Kafkaからの生のバイトをFoundryに取り込むことになります。これは以下のように描かれています:
しかし、スキーマレジストリが設定されていると、コネクタはバイトの基本スキーマを識別し、それらを一等のFoundry型に変換することができます。以下に示すように:
この機能は、面倒な型変換の必要性を排除することで、下流のパイプラインを大幅に合理化することができます。さらに、スキーマレジストリが中央集権型のスキーマ管理と、スキーマが進化するにつれての互換性チェックを提供するため、データの一貫性保証を向上させます。
コネクタは、Data Connectionを介して外部Kafkaクラスタにストリームをエクスポートすることをサポートしています。
Kafkaへのエクスポートを開始するには、まずKafkaコネクタのエクスポートを有効にします。次に、新しいエクスポートを作成します。
オプション | 必須? | デフォルト | 説明 |
---|---|---|---|
Topic | はい | N/A | エクスポートしたいKafkaトピック。 |
Linger milliseconds | はい | 0 | Kafkaにレコードを送信する前に待つミリ秒数。待機時間中に累積したレコードは、エクスポート時にバッチ化されます。この設定はデフォルトで0に設定されており、レコードはすぐに送信されます。ただし、このデフォルト設定では、Kafkaインスタンスへのリクエストが増える可能性があります。 |
Key column | いいえ | 未定義 | Kafkaにレコードを公開する際にキーとして使用したいFoundryストリームの行。Nullキーはサポートされていません。選択した行がエクスポートされるストリームのすべてのレコードに対して入力されていることを確認してください。 |
Value column | いいえ | 未定義 | エクスポートしたいFoundryストリームの行。指定しない場合、行からのすべてのフィールドがバイトとしてシリアライズされ、メッセージの本文としてKafkaにエクスポートされます。 |
Enable Base64 Decode | いいえ | 無効 | Foundryストリームのバイナリデータは、内部的に保存されるときにBase64でエンコードされます。有効にすると、このフラグにより、エクスポートする前にバイナリデータがデコードされます。これは、 Key column と Value column の両方が指定されている場合にのみ有効にできます。 |
エクスポートタスクを通じてKafkaにエクスポートすることはお勧めしません。可能ならば、既存のエクスポートタスクは私たちが推奨するエクスポート機能を使用して移行すべきです。以下のドキュメンテーションは歴史的な参照のために提供されています。
エクスポートタスクを使用してデータのエクスポートを開始するには、エクスポートしたいKafkaコネクタを含むプロジェクトフォルダーに移動します。コネクタ名を右クリックし、 Create Data Connection Task
を選択します。
Data Connectionビューの左側のパネルで:
Source
名が使用したいKafkaコネクタと一致していることを確認します。Streaming dataset
タイプの Input
として dataset
を追加します。入力データセットはエクスポートされるFoundryデータセットです。Streaming export
タイプの Output
として dataset
を追加します。出力データセットはタスクの実行、スケジューリング、監視に使用されます。YAMLを作成するときに以下のオプションを使用します:
オプション | 必須? | デフォルト | 説明 |
---|---|---|---|
maxParallelism | はい | いいえ | データをエクスポートするために使用できる並列スレッドの最大数。実際のスレッド数は、入力Foundryストリームのパーティション数によって決まります( maxParallelism よりも少ない場合)。 |
topic | はい | いいえ | データがプッシュされるトピックの名前。 |
clientId | はい | エクスポートタスクに使用する識別子。識別子はKafka client.id にマップされます。詳細については、Kafkaドキュメンテーションを参照してください。 | |
batchOptions | はい | いいえ | 下記の batchOptions 設定を参照してください。 |
keyColumn | いいえ | いいえ | 入力ストリーミングデータセットの行の名前。この行の値はエクスポートメッセージのキーとして使用されます。このプロパティを省略すると、キーの値として null がエクスポートされます。 |
valueColumn | いいえ | いいえ | 入力ストリーミングデータセットの行の名前。この行の値はエクスポートメッセージの値として使用されます。このプロパティを省略すると、すべての行(文字列化されたJSONオブジェクトとして)が value フィールドの下にエクスポートされます。 |
enableIdempotence | いいえ | true | 詳細については、Kafkaドキュメンテーションを参照してください。 |
useDirectReaders | はい | いいえ | 常に false に設定します。例に従って設定します。 |
transforms | はい | いいえ | 下記の例の設定を参照してください。 |
以下のオプションを使用して batchOptions
を設定します:
オプション | 必須? | デフォルト | 説明 |
---|---|---|---|
maxMilliseconds | はい | いいえ | 出力トピックに書き込む前に待つ最大期間(ミリ秒)。この値を下げると遅延が減り、増やすとネットワークオーバーヘッド(リクエスト数)が減ります。この値は、バッチが maxRecords の制限に先に達した場合を除き、使用されます。 |
maxRecords | はい | いいえ | 出力トピックに書き込む前にバッファリングするメッセージの最大数。この値を下げると遅延が減り、増やすとネットワークオーバーヘッド(リクエスト数)が減ります。この値は、バッチが maxMilliseconds の制限に先に達した場合を除き、使用されます。 |
以下にエクスポートタスク設定の例を示します:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
type: streaming-kafka-export-task # タスクのタイプを指定します。ここでは"streaming-kafka-export-task"を指定します。 config: # 設定パラメータを指定します。 maxParallelism: 1 # 最大並列処理数を指定します。ここでは1を指定します。 topic: test-topic # Kafkaのトピックを指定します。ここでは"test-topic"を指定します。 clientId: client-id # クライアントIDを指定します。ここでは"client-id"を指定します。 keyColumn: key # キーカラムを指定します。ここでは"key"を指定します。 valueColumn: value # バリューカラムを指定します。ここでは"value"を指定します。 batchOptions: # バッチオプションを指定します。 maxMilliseconds: 5000 # 最大ミリ秒数を指定します。ここでは5000を指定します。 maxRecords: 1000 # 最大レコード数を指定します。ここでは1000を指定します。 transforms: # 変換を指定します。 transformType: test # 変換タイプを指定します。ここでは"test"を指定します。 userCodeMavenCoords: [] # ユーザーコードのMaven座標を指定します。ここでは空配列を指定します。 useDirectReaders: false # 直接のリーダーの使用を指定します。ここでは"false"を指定します。
エクスポートタスクを設定した後、右上の角にある保存を選択してください。