Connecting to Kafka Using Dekaf
Dekaf is Estuary's Kafka API compatibility layer, allowing consumers to read data from Estuary collections as if they were Kafka topics. Additionally, Dekaf provides a schema registry API for managing schemas. This guide will walk you through the steps to connect to Estuary using Dekaf and its schema registry.
Overview
- Collections represent datasets within Estuary. All captured documents are written to a collection, and all materialized documents are read from a collection.
- Dekaf enables you to interact with these collections as though they were Kafka topics, providing seamless integration with existing Kafka-based tools and workflows.
Key Features
- Kafka Topic Emulation: Access Estuary collections as if they were Kafka topics.
- Schema Registry Emulation: Manage and retrieve schemas assigned to Estuary collections, emulating Confluent's Schema Registry.
- Backfill Support: Estuary signals to downstream consumers when offsets need to be reset via Kafka leader epochs.
Connection Details
To connect to Estuary via Dekaf, use the following connection details in conjunction with a Dekaf materialization connector:
- Broker Address:
dekaf.estuary-data.com - Schema Registry Address:
https://dekaf.estuary-data.com - Security Protocol:
SASL_SSL - SASL Mechanism:
PLAIN - SASL Username: The full name of your Dekaf materialization, such as
YOUR-ORG/YOUR-PREFIX/YOUR-MATERIALIZATION - SASL Password: The auth token from your Dekaf materialization
- Schema Registry Username: The full name of your Dekaf materialization
- Schema Registry Password: The auth token from your Dekaf materialization
How to Connect to Dekaf
1. Create a Dekaf materialization connector
-
From the Estuary dashboard, navigate to the Destinations tab.
-
Click New Materialization and choose a Dekaf connector.
- There are several Dekaf connector variations besides the generic "Dekaf," such as Tinybird. Currently, they don't behave appreciably differently from each other. You may use the different variations to keep your data organized, manage what data you share, and see at a glance where your data is going.
-
Provide a name and auth token to your materialization.
-
The full materialization name, which also includes your organization/prefix, will be used as the username when consumers connect to Dekaf.
-
The auth token that you provide will be used as the password when consumers connect to Dekaf. Make sure to use a secure token.
-
-
(Optional) Adjust additional configuration details, such as the Strict Topic Names or Deletion Mode settings.
-
Choose data collections to materialize. Click Source from capture or add individual collections.
- Each collection you add to the materialization will be available for consumers to read as a topic. By default, the topic name is the collection name.
-
Select Next, then Save and Publish.
2. Set Up Your Kafka Client
Configure your Kafka client using the connection details provided.
Example Kafka Client Configuration
Below is an example configuration for a Kafka client using Python’s kafka-python library:
from kafka import KafkaConsumer
# Configuration details
conf = {
'bootstrap_servers': 'dekaf.estuary-data.com:9092',
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': 'Your_Org/Your_Prefix/Your_Materialization',
'sasl_plain_password': 'Your_Auth_Token',
'group_id': 'your_group_id',
'auto_offset_reset': 'earliest'
}
# Create Consumer instance
consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=conf['bootstrap_servers'],
security_protocol=conf['security_protocol'],
sasl_mechanism=conf['sasl_mechanism'],
sasl_plain_username=conf['sasl_plain_username'],
sasl_plain_password=conf['sasl_plain_password'],
group_id=conf['group_id'],
auto_offset_reset=conf['auto_offset_reset'],
enable_auto_commit=True,
value_deserializer=lambda x: x.decode('utf-8')
)
# Poll for messages
try:
for msg in consumer:
print(f"Received message: {msg.value}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
You can also use kcat (formerly known as kafkacat) to test reading messages from an Estuary collection as if it were a Kafka topic.
kcat -C \
-X broker.address.family=v4 \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X sasl.username="Your_Org/Your_Prefix/Your_Materialization" \
-X sasl.password="Your_Auth_Token" \
-b dekaf.estuary-data.com:9092 \
-t "Your_Topic_Name" \
-p 0 \
-o beginning \
-s avro \
-r https://{Your_Org/Your_Prefix/Your_Materialization}:{Your_Auth_Token}@dekaf.estuary-data.com
Testing a Dekaf topic with kcat
When a consumer reports missing or unexpected records, read the topic directly with kcat to isolate whether Dekaf is serving the records from how your client consumes them (partitions, offsets, deserialization). Start from the consume example above and adjust as described below.
That example pins a single partition with -p 0, which reads only that partition. On a
multi-partition topic that makes records on the other partitions look missing — omit -p
to read every partition, and add -e so kcat exits at end-of-topic instead of waiting for
more messages.
Confirm the topic is served and list its partitions using metadata mode (-L):
kcat -L \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X sasl.username="Your_Org/Your_Prefix/Your_Materialization" \
-X sasl.password="Your_Auth_Token" \
-b dekaf.estuary-data.com:9092
Count how many records are being served across all partitions (connection flags
abbreviated as ...):
kcat -C -e -q ... -t "Your_Topic_Name" -o beginning -f '%k\n' | wc -l
Locate a specific record and see which partition and offset it landed on:
kcat -C -e -q ... -t "Your_Topic_Name" -o beginning -f 'p=%p o=%o key=%k\n' | grep Your_Key
Format tokens: %p partition, %o offset, %k key, %T timestamp, %s payload.
If a record shows up here but not in your application, the problem is client-side: check your consumer-group offsets and your deserializer. Permissive Avro/JSON decoders can silently drop records that fail to decode — switch to a strict or fail-fast mode to surface the error rather than discarding the record.