Kafka

Connect Foundry to Kafka to read data from a Kafka queue into a Foundry stream in realtime.

Supported capabilities

CapabilityStatus
Exploration🟢 Generally available
Streaming syncs🟢 Generally available
Streaming exports🟢 Generally available

Data model

key (binary)value (binary)
London{"firstName": "John", "lastName": "Doe"}
Paris{"firstName": "Jean", "lastName": "DuPont"}

The Kafka connector does not parse message contents, and data of any type can be synced into Foundry. All content is uploaded, unparsed, under the value column. Use a downstream streaming transform (for example, parse_json in Pipeline Builder) to parse the data. The key column will display the key that was recorded in Kafka along with the message. If the message does not include a key, the value will be null.

Performance and limitations

The connector always uses a single consumer thread regardless of the number of partitions on the source Kafka topic.

Streaming syncs are meant to be consistent, long-running jobs. Any interruption to a streaming sync is a potential outage depending on expected outcomes.

Currently, streaming syncs have the following limitations:

  • Jobs from agent connections restart during maintenance windows (typically once a week) to pick up upgrades. Expected downtime is less than five minutes.
  • Jobs from direct connections restart at least once every 48 hours. Expected downtime is single-digit minutes (assuming resource availability allows jobs to restart immediately).

We recommend running streaming syncs on an agent connection for improved performance, bandwidth, and availability.

Setup

  1. Open the Data Connection application and select + New Source in the upper right corner of the screen.
  2. Select Kafka from the available connector types.
  3. Choose to use a direct connection over the Internet or to connect through an intermediary agent.
    • We recommend connecting through two agents per source to successfully set up your Kafka connector and reduce downtime. Be sure the agents do not have overlapping maintenance windows.
  4. Follow the additional configuration prompts to continue the set up of your connector using the information in the sections below.

Learn more about setting up a connector in Foundry.

If you do not see Kafka on the connector type page, contact Palantir Support to enable access.

Bootstrap servers

ParameterRequired?DefaultDescription
Bootstrap serversYesNoAdd Kafka broker servers in the format HOST:PORT, one per line.

Authentication

Select a credential method to authenticate your Kafka connection: SSL, Username/Password, Azure AD, Kerberos, or NONE.

Configured credentials must allow the following operations:

  • Topic resource:
    • Read for streaming syncs and exploration
    • Write for streaming exports

SSL

SSL authentication corresponds to the standard Kafka SSL and SASL_SSLprotocols.

To authenticate with SSL, complete the following configuration options:

ParameterRequired?DefaultDescription
Endpoint identification algorithmYesHTTPSHTTPS: Verify that the broker host name matches the host name in the broker's certificate.
NONE: Disable endpoint identification.
Two-way SSL enabledNoDisabledEnable mutual TLS (mTLS); additional configuration required.
Use SASLNoNoEnable SASL authentication.

If enabling mutual TLS (two-way SSL), add your private key password:

ParameterRequired?DefaultDescription
SSL key passwordYesNoPassword required to decrypt private key.

If enabling SASL authentication, complete additional configuration:

ParameterRequired?DefaultDescription
SASL mechanismNoNoSelect the algorithm with which to encrypt credentials.
saslJaasConfigUsernameYesNoUsername
SASL JAAS config passwordYesNoPassword
SASL client callback handler classYesNoShows the default callback handler for SASL clients. See the Java SASL API documentation ↗ for more information about SASL callback handlers.

OAuth 2.0

OAuth authentication uses the OAuth 2.0 protocol. Only the Client Credentials Grant flow is currently supported.

To use the OAuth 2.0 protocol for authentication, complete the following configuration options:

ParameterRequired?DefaultDescription
Client IDYesNoThe ID of your application requesting authentication
Client SecretYesNoThe shared secret between the server and your application
Token Endpoint URIYesNoThe Uniform Resource Identifier (URI) of the server that grants access/ ID tokens
ScopesNoNoConnecting via OAuth to Kafka, or specific Kafka topics, may require requesting a collection of scopes. Scopes are arbitrary string values configured in the authentication provider. For example, consumers may need to request one of (kafka-topics-read, kafka-topics-list) in their authentication request to determine the level of access to Kafka they receive.
SASL ExtensionsNoNoSome Kafka servers, for example Confluent Cloud, may require SASL extensions ↗ to be configured, which would be key-value pairs specific to that server platform.

Azure AD

The Azure AD authentication mode applies to the Kafka interface for Azure Event Hubs. This mode requires an Azure AD service principal. Review the Azure documentation ↗ to learn how to create a service principal and set up access to Event Hubs.

ParameterRequired?Default
Tenant IDYesNo
Client IDYesNo
Client secretYesNo

The Kafka interface for Azure Event Hubs can also be accessed through a SAS Token ↗. To authenticate with a SAS token, select Username/Password authentication, with username $ConnectionString (no quotes) and password your EventHubs connection string.

None

Corresponds to Kafka's standard PLAINTEXT protocol.

We highly discourage configuring the connector without authentication or SSL as this will pass unencrypted data between the connector and the Kafka broker. Only use this configuration within secure networks.

Networking

The connector must have access to the host of the Kafka broker. If using a direct connection, create DNS egress policies for all bootstrap server hosts.

Certificates and private keys

You may need to configure additional client or server certificates and private keys for SSL and TLS.

SSL

SSL connections validate servers certificates. Normally, SSL validations occur through a certificate chain; by default, both agent and direct connection runtimes trust most standard certificate chains. However, if the server to which you are connecting has a self-signed certificate, or if hostname validation intercepts the connection, the connector must trust the certificate. Contact your Kafka administrator for the right certificate to use.

Learn more about using certificates in Data Connection.

Mutual TLS (mTLS)

Your Kafka cluster might require that both the server and client authenticate through mTLS. To enable mTLS, you must configure the following:

Follow the steps below to configure a client private key based on the connector run type.

Configure client private key for agents

If connecting via an agent, follow the instructions on how to add a private key

Configure client private key for direct connections

If connecting directly, upload your private key in the Configure client certificates and private key section of the connector configuration page. Use the alias kafka in the pop-up that appears, then add the private key and client certificate.

Interface showing option to configure client certificates and private key

Pop-up where users can enter alias, private key, and client certificate

Sync data from Kafka

Learn how to set up a sync with Kafka in the Set up a streaming sync tutorial.

Schema Registry integration

The Kafka Schema Registry functions as a centralized storage system, maintaining a versioned history of all schemas. The registry offers compatibility with Avro, Protobuf, and JSON schemas, and uses SerDes (Serializer/Deserializer) to facilitate conversions between schema formats and serialized data.

To leverage the Kafka Schema Registry effectively, it is necessary to register schemas for the relevant Kafka topics and append the Schema Registry URL to the source configuration. This adjustment empowers the connector to transform raw bytes into corresponding data types.

For instance, a standard extraction would typically ingest raw bytes from Kafka into Foundry, as depicted below:

Standard binary Kafka extract.

However, with the Schema Registry configured, the connector can discern the underlying schema of the bytes and transform them into first-class Foundry types, as shown here:

Avro Kafka extract.

This feature can significantly streamline downstream pipelines by eliminating the need for cumbersome type conversion. Moreover, it elevates data consistency guarantees, given that the Schema Registry offers centralized schema management and compatibility checks as schemas evolve.

Export data to Kafka

The connector supports exporting streams to external Kafka clusters via Data Connection.

To export to Kafka, first enable exports for your Kafka connector. Then, create a new export.

Export configuration options

OptionRequired?DefaultDescription
TopicYesN/AThe Kafka topic to which you want to export.
Linger millisecondsYes0The number of milliseconds to wait before sending records to Kafka. Records that accrue during the waiting time will be batched together when exported. This setting defaults to 0, meaning that records will be sent immediately. However, this default may result in more requests to your Kafka instance.
Key columnNoUndefinedThe column from the Foundry stream that you wish to use as the Key when publishing records to Kafka. Null keys are not supported; ensure the selected column is populated for all records in the stream being exported.
Value columnNoUndefinedThe column from the Foundry stream that you wish to export. If not specified, all fields from the row will be serialized as bytes and exported to Kafka in the body of the message.
Enable Base64 DecodeNoDisabledBinary data in Foundry streams is Base64 encoded when stored internally. When enabled, this flag will result in binary data being decoded before exporting. This may only be enabled if both Key column and Value column are specified.

Export task configuration (legacy)

We do not recommend exporting to Kafka through export tasks. If possible, existing export tasks should be migrated to use our recommended export capability. The documentation below is intended for historical reference.

To begin exporting data using export tasks, navigate to the Project folder that contains the Kafka connector to which you want to export. Right select on the connector name, then select Create Data Connection Task.

In the left side panel of the Data Connection view:

  1. Verify the Source name matches the Kafka connector you want to use.
  2. Add an Input called dataset of type Streaming dataset. The input dataset is the Foundry dataset being exported.
  3. Add an Output called dataset of type Streaming export. The output dataset is used to run, schedule, and monitor the task.
  4. Finally, add a YAML block in the text field to define the task configuration.

Use the following options when creating the YAML:

OptionRequired?DefaultDescription
maxParallelismYesNoThe maximum allowed number of parallel threads used to export data. Actual number of threads is dictated by the number of partitions on the input Foundry stream (if lower than maxParallelism).
topicYesNoThe name of the topic to which data is pushed.
clientIdYesThe identifier to use for the export task. The identifier maps to the Kafka client.id. Review the Kafka documentation ↗ for more information.
batchOptionsYesNoSee batchOptions configuration below.
keyColumnNoNoName of a column in input streaming dataset. Values in this column are used as the key in exported messages. Omitting this property will export null values for the key.
valueColumnNoNoName of a column in input streaming dataset. Values in this column are used as the value in exported messages. Omitting this property will export all columns (as a stringified JSON object) under the value field.
enableIdempotenceNotrueReview the Kafka documentation ↗ for more information.
useDirectReadersYesNoAlways set false. Configure per the example.
transformsYesNoSee the example configuration below.

Configure batchOptionsusing the following options:

OptionRequired?DefaultDescription
maxMillisecondsYesNoThe maximum duration (in milliseconds) to wait before writing available rows to the output topic. Lower this value to reduce latency, increase to reduce network overhead (number of requests). This value is used unless the batch hits the maxRecords limit first.
maxRecordsYesNoThe maximum number of messages to buffer before writing to the output topic. Lower this value to reduce latency, increase to reduce network overhead (number of requests). This value is used unless the batch hits the maxMilliseconds limit first.

The following shows an example export task configuration:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type: streaming-kafka-export-task config: maxParallelism: 1 topic: test-topic clientId: client-id keyColumn: key valueColumn: value batchOptions: maxMilliseconds: 5000 maxRecords: 1000 transforms: transformType: test userCodeMavenCoords: [] useDirectReaders: false

After you configure the export task, select Save in the upper-right corner.