Connect Foundry to Kafka to read data from a Kafka queue into a Foundry stream in realtime.
Capability | Status |
---|---|
Exploration | 🟢 Generally available |
Streaming syncs | 🟢 Generally available |
Streaming exports | 🟢 Generally available |
key (binary) | value (binary) |
---|---|
London | {"firstName": "John", "lastName": "Doe"} |
Paris | {"firstName": "Jean", "lastName": "DuPont"} |
The Kafka connector does not parse message contents, and data of any type can be synced into Foundry. All content is uploaded, unparsed, under the value
column. Use a downstream streaming transform (for example, parse_json
in Pipeline Builder) to parse the data. The key
column will display the key that was recorded in Kafka along with the message. If the message does not include a key, the value will be null
.
The connector always uses a single consumer thread regardless of the number of partitions on the source Kafka topic.
Streaming syncs are meant to be consistent, long-running jobs. Any interruption to a streaming sync is a potential outage depending on expected outcomes.
Currently, streaming syncs have the following limitations:
We recommend running streaming syncs on an agent connection for improved performance, bandwidth, and availability.
Learn more about setting up a connector in Foundry.
If you do not see Kafka on the connector type page, contact Palantir Support to enable access.
Parameter | Required? | Default | Description |
---|---|---|---|
Bootstrap servers | Yes | No | Add Kafka broker servers in the format HOST:PORT , one per line. |
Select a credential method to authenticate your Kafka connection: SSL, Username/Password, Azure AD, Kerberos, or NONE.
Configured credentials must allow the following operations:
Topic
resource:
Read
for streaming syncs and explorationWrite
for streaming exportsSSL authentication corresponds to the standard Kafka SSL
and SASL_SSL
protocols.
To authenticate with SSL, complete the following configuration options:
Parameter | Required? | Default | Description |
---|---|---|---|
Endpoint identification algorithm | Yes | HTTPS | HTTPS : Verify that the broker host name matches the host name in the broker's certificate. NONE : Disable endpoint identification. |
Two-way SSL enabled | No | Disabled | Enable mutual TLS (mTLS); additional configuration required. |
Use SASL | No | No | Enable SASL authentication. |
If enabling mutual TLS (two-way SSL), add your private key password:
Parameter | Required? | Default | Description |
---|---|---|---|
SSL key password | Yes | No | Password required to decrypt private key. |
If enabling SASL authentication, complete additional configuration:
Parameter | Required? | Default | Description |
---|---|---|---|
SASL mechanism | No | No | Select the algorithm with which to encrypt credentials. |
saslJaasConfigUsername | Yes | No | Username |
SASL JAAS config password | Yes | No | Password |
SASL client callback handler class | Yes | No | Shows the default callback handler for SASL clients. See the Java SASL API documentation ↗ for more information about SASL callback handlers. |
OAuth authentication uses the OAuth 2.0 protocol. Only the Client Credentials Grant flow is currently supported.
To use the OAuth 2.0 protocol for authentication, complete the following configuration options:
Parameter | Required? | Default | Description |
---|---|---|---|
Client ID | Yes | No | The ID of your application requesting authentication |
Client Secret | Yes | No | The shared secret between the server and your application |
Token Endpoint URI | Yes | No | The Uniform Resource Identifier (URI) of the server that grants access/ ID tokens |
Scopes | No | No | Connecting via OAuth to Kafka, or specific Kafka topics, may require requesting a collection of scopes. Scopes are arbitrary string values configured in the authentication provider. For example, consumers may need to request one of (kafka-topics-read, kafka-topics-list) in their authentication request to determine the level of access to Kafka they receive. |
SASL Extensions | No | No | Some Kafka servers, for example Confluent Cloud, may require SASL extensions ↗ to be configured, which would be key-value pairs specific to that server platform. |
The Azure AD authentication mode applies to the Kafka interface for Azure Event Hubs. This mode requires an Azure AD service principal. Review the Azure documentation ↗ to learn how to create a service principal and set up access to Event Hubs.
Parameter | Required? | Default |
---|---|---|
Tenant ID | Yes | No |
Client ID | Yes | No |
Client secret | Yes | No |
The Kafka interface for Azure Event Hubs can also be accessed through a SAS Token ↗. To authenticate with a SAS token, select Username/Password authentication, with username $ConnectionString
(no quotes) and password your EventHubs connection string.
Corresponds to Kafka's standard PLAINTEXT
protocol.
We highly discourage configuring the connector without authentication or SSL as this will pass unencrypted data between the connector and the Kafka broker. Only use this configuration within secure networks.
The connector must have access to the host of the Kafka broker. If using a direct connection, create DNS egress policies for all bootstrap server hosts.
You may need to configure additional client or server certificates and private keys for SSL and TLS.
SSL connections validate servers certificates. Normally, SSL validations occur through a certificate chain; by default, both agent and direct connection runtimes trust most standard certificate chains. However, if the server to which you are connecting has a self-signed certificate, or if hostname validation intercepts the connection, the connector must trust the certificate. Contact your Kafka administrator for the right certificate to use.
Learn more about using certificates in Data Connection.
Your Kafka cluster might require that both the server and client authenticate through mTLS. To enable mTLS, you must configure the following:
Follow the steps below to configure a client private key based on the connector run type.
If connecting via an agent, follow the instructions on how to add a private key
If connecting directly, upload your private key in the Configure client certificates and private key section of the connector configuration page. Use the alias kafka
in the pop-up that appears, then add the private key and client certificate.
Learn how to set up a sync with Kafka in the Set up a streaming sync tutorial.
The Kafka Schema Registry functions as a centralized storage system, maintaining a versioned history of all schemas. The registry offers compatibility with Avro
, Protobuf
, and JSON
schemas, and uses SerDes (Serializer/Deserializer) to facilitate conversions between schema formats and serialized data.
To leverage the Kafka Schema Registry effectively, it is necessary to register schemas for the relevant Kafka
topics and append the Schema Registry URL
to the source configuration. This adjustment empowers the connector to transform raw bytes into corresponding data types.
For instance, a standard extraction would typically ingest raw bytes from Kafka into Foundry, as depicted below:
However, with the Schema Registry configured, the connector can discern the underlying schema of the bytes and transform them into first-class Foundry types, as shown here:
This feature can significantly streamline downstream pipelines by eliminating the need for cumbersome type conversion. Moreover, it elevates data consistency guarantees, given that the Schema Registry offers centralized schema management and compatibility checks as schemas evolve.
The connector supports exporting streams to external Kafka clusters via Data Connection.
To export to Kafka, first enable exports for your Kafka connector. Then, create a new export.
Option | Required? | Default | Description |
---|---|---|---|
Topic | Yes | N/A | The Kafka topic to which you want to export. |
Linger milliseconds | Yes | 0 | The number of milliseconds to wait before sending records to Kafka. Records that accrue during the waiting time will be batched together when exported. This setting defaults to 0, meaning that records will be sent immediately. However, this default may result in more requests to your Kafka instance. |
Key column | No | Undefined | The column from the Foundry stream that you wish to use as the Key when publishing records to Kafka. Null keys are not supported; ensure the selected column is populated for all records in the stream being exported. |
Value column | No | Undefined | The column from the Foundry stream that you wish to export. If not specified, all fields from the row will be serialized as bytes and exported to Kafka in the body of the message. |
Enable Base64 Decode | No | Disabled | Binary data in Foundry streams is Base64 encoded when stored internally. When enabled, this flag will result in binary data being decoded before exporting. This may only be enabled if both Key column and Value column are specified. |
We do not recommend exporting to Kafka through export tasks. If possible, existing export tasks should be migrated to use our recommended export capability. The documentation below is intended for historical reference.
To begin exporting data using export tasks, navigate to the Project folder that contains the Kafka connector to which you want to export. Right select on the connector name, then select Create Data Connection Task
.
In the left side panel of the Data Connection view:
Source
name matches the Kafka connector you want to use.Input
called dataset
of type Streaming dataset
. The input dataset is the Foundry dataset being exported.Output
called dataset
of type Streaming export
. The output dataset is used to run, schedule, and monitor the task.Use the following options when creating the YAML:
Option | Required? | Default | Description |
---|---|---|---|
maxParallelism | Yes | No | The maximum allowed number of parallel threads used to export data. Actual number of threads is dictated by the number of partitions on the input Foundry stream (if lower than maxParallelism ). |
topic | Yes | No | The name of the topic to which data is pushed. |
clientId | Yes | The identifier to use for the export task. The identifier maps to the Kafka client.id . Review the Kafka documentation ↗ for more information. | |
batchOptions | Yes | No | See batchOptions configuration below. |
keyColumn | No | No | Name of a column in input streaming dataset. Values in this column are used as the key in exported messages. Omitting this property will export null values for the key. |
valueColumn | No | No | Name of a column in input streaming dataset. Values in this column are used as the value in exported messages. Omitting this property will export all columns (as a stringified JSON object) under the value field. |
enableIdempotence | No | true | Review the Kafka documentation ↗ for more information. |
useDirectReaders | Yes | No | Always set false . Configure per the example. |
transforms | Yes | No | See the example configuration below. |
Configure batchOptions
using the following options:
Option | Required? | Default | Description |
---|---|---|---|
maxMilliseconds | Yes | No | The maximum duration (in milliseconds) to wait before writing available rows to the output topic. Lower this value to reduce latency, increase to reduce network overhead (number of requests). This value is used unless the batch hits the maxRecords limit first. |
maxRecords | Yes | No | The maximum number of messages to buffer before writing to the output topic. Lower this value to reduce latency, increase to reduce network overhead (number of requests). This value is used unless the batch hits the maxMilliseconds limit first. |
The following shows an example export task configuration:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
type: streaming-kafka-export-task config: maxParallelism: 1 topic: test-topic clientId: client-id keyColumn: key valueColumn: value batchOptions: maxMilliseconds: 5000 maxRecords: 1000 transforms: transformType: test userCodeMavenCoords: [] useDirectReaders: false
After you configure the export task, select Save in the upper-right corner.