Skip to main content

Apache Kafka

This connector captures streaming data from Apache Kafka topics.

Supported message formats

This connector supports Kafka messages encoded in Avro, Protobuf, or JSON format.

For Avro and Protobuf messages, the connector must be configured to use a schema registry. Schema references (import for Protobuf) are supported for Protobuf schemas but not for Avro or JSON schemas.

Protobuf field names are serialized using the proto field name (snake_case), not the default camelCase JSON mapping.

JSON messages may be read without a schema registry. If the JSON messages were encoded with a JSON schema, configuring a schema registry is recommended to enable discovery of collection keys if the message key has an associated schema.

Prerequisites

  • A Kafka cluster with:
    • bootstrap.servers configured so that clients may connect via the desired host and port
    • An authentication mechanism of choice set up (highly recommended for production environments)
    • Connection security enabled with TLS (highly recommended for production environments)
  • If using schema registry:
    • The endpoint to use for connecting to the schema registry
    • Username for authentication
    • Password for authentication
  • For Avro and JSON schemas: flat schemas only (no schema references / $ref). Protobuf schemas support references (import).
tip

If you are using the Confluent Cloud Schema Registry, your schema registry username and password will be the key and secret from your schema registry API key. See the Confluent Cloud Schema Registry Documentation for help setting up a schema registry API key.

Discovered collection schemas

If no schema registry is configured, all available topics will be discovered and use a collection key composed of the captured message's partition and offset. If schema registry is configured, Estuary collections for Kafka topics will be discovered using the latest version of the registered key schema for the topic.

For a collection key to be discovered from a registered topic key schema, the topic key schema must be compatible with an Estuary collection key, with the following additional considerations:

  • Key fields must not contain null as a type
  • Key fields can be a single type only
  • Keys may contain nested fields, such as types with nested Avro records

If a topic has a registered key schema but it does not fit these requirements, the default collection key of partition and offset will be used instead.

Captured document structure

Each captured document is constructed from the Kafka message's key, value, and metadata:

  • Message value fields become top-level fields in the captured document.
  • Message key fields are merged as top-level fields. If there is a name collision between key and value fields, the key field takes precedence.
  • If the key is a scalar type (not an object/record), it is captured under a synthetic _key field.
  • If a key is present but has no schema, it is captured as a base64-encoded string under _key.
  • A null message payload (absent value) is interpreted as a deletion tombstone (_meta.op is set to "d").
  • Metadata is captured under the _meta object, including:
    • topic — the Kafka topic name
    • partition — the partition number
    • offset — the message offset
    • op — the operation type ("u" for upsert, "d" for delete)
    • headers — message headers (if present), as key-value pairs
    • timestamp — the message timestamp (if available), either creation time or log append time

Authentication and connection security

Neither authentication nor connection security are enabled by default in your Kafka cluster, but both are important considerations. Similarly, Estuary's Kafka connectors do not strictly require authentication or connection security mechanisms. You may choose to omit them for local development and testing; however, both are strongly encouraged for production environments.

A wide variety of authentication methods is available in Kafka clusters. Estuary supports SASL/SCRAM-SHA-256, SASL/SCRAM-SHA-512, and SASL/PLAIN. Behavior using other authentication methods is not guaranteed. When authentication details are not provided, the client connection will attempt to use PLAINTEXT (insecure) protocol.

If you don't already have authentication enabled on your cluster, Estuary recommends either of listed SASL/SCRAM methods. With SCRAM, you set up a username and password, making it analogous to the traditional authentication mechanisms you use in other applications.

tip

If you are connecting to Kafka hosted on Confluent Cloud, select the PLAIN SASL mechanism.

For connection security, Estuary recommends that you enable TLS encryption for your SASL mechanism of choice, as well as all other components of your cluster. Note that because TLS replaced now-deprecated SSL encryption, Kafka still uses the acronym "SSL" to refer to TLS encryption. See Confluent's documentation for details.

Beta

TLS encryption is currently the only supported connection security mechanism for this connector. Other connection security methods may be enabled in the future.

AWS Managed Streaming Kafka (MSK)

If using AWS Managed Streaming for Apache Kafka (MSK), you can use IAM authentication with our connector. Read more about IAM authentication with MSK in AWS docs: IAM access control.

Additionally, you want to make sure that your VPC configuration allows inbound and outbound requests to Estuary IP addresses.

Configuration

You configure connectors either in the Estuary web app, or by directly editing the catalog specification file. See connectors to learn more about using connectors. The values and specification sample below provide configuration details specific to the Apache Kafka source connector.

Properties

Endpoint

PropertyTitleDescriptionTypeRequired/Default
/bootstrap_serversBootstrap serversThe initial servers in the Kafka cluster to connect to, separated by commas. The Kafka client will be informed of the rest of the cluster nodes by connecting to one of these nodes.stringRequired
/tlsTLSTLS connection settings.string"system_certificates"
/credentialsCredentialsConnection details used to authenticate a client connection to Kafka via SASL.null, object
/credentials/auth_typeAuthentication typeOne of UserPassword for SASL or AWS for IAM authenticationstring
/credentials/mechanismSASL MechanismSASL mechanism describing how to exchange and authenticate client servers.string
/credentials/passwordPasswordPassword, if applicable for the authentication mechanism chosen.string
/credentials/usernameUsernameUsername, if applicable for the authentication mechanism chosen.string
/credentials/aws_access_key_idAWS Access Key IDSupply if using auth_type: AWSstring
/credentials/aws_secret_access_keyAWS Secret Access KeySupply if using auth_type: AWSstring
/credentials/regionAWS RegionSupply if using auth_type: AWSstring
/schema_registrySchema RegistryConnection details for interacting with a schema registry.objectRequired
schema_registry/schema_registry_typeSchema Registry TypeEither confluent_schema_registry or no_schema_registry.objectRequired
/schema_registry/endpointSchema Registry EndpointSchema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud.string
/schema_registry/usernameSchema Registry UsernameSchema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.string
/schema_registry/passwordSchema Registry PasswordSchema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.string
/schema_registry/enable_json_onlyCapture Messages in JSON Format OnlyIf no schema registry is configured the capture will attempt to parse all data as JSON, and discovered collections will use a key of the message partition & offset.boolean

Bindings

PropertyTitleDescriptionTypeRequired/Default
/topicStreamKafka topic name.stringRequired

Sample

User and password authentication (SASL):

captures:
${PREFIX}/${CAPTURE_NAME}:
endpoint:
connector:
image: ghcr.io/estuary/source-kafka:v1
config:
bootstrap_servers: server1:9092,server2:9092
tls: system_certificates
credentials:
auth_type: UserPassword
mechanism: SCRAM-SHA-512
username: bruce.wayne
password: definitely-not-batman
schema_registry:
schema_registry_type: confluent_schema_registry
endpoint: https://schema.registry.com
username: schemaregistry.username
password: schemaregistry.password
bindings:
- resource:
topic: ${TOPIC_NAME}
target: ${PREFIX}/${COLLECTION_NAME}

AWS IAM authentication:

captures:
${PREFIX}/${CAPTURE_NAME}:
endpoint:
connector:
image: ghcr.io/estuary/source-kafka:v1
config:
bootstrap_servers: server1:9092,server2:9092
tls: system_certificates
credentials:
auth_type: AWS
aws_access_key_id: AK...
aws_secret_access_key: secret
region: us-east-1
schema_registry:
schema_registry_type: confluent_schema_registry
endpoint: https://schema.registry.com
username: schemaregistry.username
password: schemaregistry.password
bindings:
- resource:
topic: ${TOPIC_NAME}
target: ${PREFIX}/${COLLECTION_NAME}

Your capture definition will likely be more complex, with additional bindings for each Kafka topic.

Learn more about capture definitions..