Bytewax
This guide uses a legacy method of connecting with Dekaf and is presented for historical purposes. For new integrations or to migrate your existing Dekaf setup to the new workflow, see the Dekaf materialization connector.
This guide demonstrates how to use Estuary Flow to stream data to Bytewax using the Kafka-compatible Dekaf API.
Bytewax is a Python framework for building scalable dataflow applications, designed for high-throughput, low-latency data processing tasks.
Connecting Estuary Flow to Bytewax
-
Generate a refresh token for the Bytewax connection from the Estuary Admin Dashboard.
-
Install Bytewax and the Kafka Python client:
pip install bytewax kafka-python
-
Create a Python script for your Bytewax dataflow, using the following template:
import json
from datetime import timedelta
from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.window import TumblingWindowConfig, SystemClockConfig
# Estuary Flow Dekaf configuration
KAFKA_BOOTSTRAP_SERVERS = "dekaf.estuary-data.com:9092"
KAFKA_TOPIC = "/full/nameof/your/collection"
# Parse incoming messages
def parse_message(msg):
data = json.loads(msg)
# Process your data here
return data
# Define your dataflow
src = KafkaSource(brokers=KAFKA_BOOTSTRAP_SERVERS, topics=[KAFKA_TOPIC], add_config={
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "{}",
"sasl.password": os.getenv("DEKAF_TOKEN"),
})
flow = Dataflow()
flow.input("input", src)
flow.input("input", KafkaInputConfig(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC))
flow.map(parse_message)
# Add more processing steps as needed
flow.output("output", StdOutputConfig())
if __name__ == "__main__":
from bytewax.execution import run_main
run_main(flow) -
Replace
"/full/nameof/your/collection"
with your actual collection name from Estuary Flow. -
Run your Bytewax dataflow:
python your_dataflow_script.py
-
Your Bytewax dataflow is now processing data from Estuary Flow in real-time.