Bytewax
This guide demonstrates how to use Estuary Flow to stream data to Bytewax using the Kafka-compatible Dekaf API.
Bytewax is a Python framework for building scalable dataflow applications, designed for high-throughput, low-latency data processing tasks.
Connecting Estuary Flow to Bytewax
-
Generate a refresh token for the Bytewax connection from the Estuary Admin Dashboard.
-
Install Bytewax and the Kafka Python client:
pip install bytewax kafka-python
-
Create a Python script for your Bytewax dataflow, using the following template:
import json
from datetime import timedelta
from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.window import TumblingWindowConfig, SystemClockConfig
# Estuary Flow Dekaf configuration
KAFKA_BOOTSTRAP_SERVERS = "dekaf.estuary.dev:9092"
KAFKA_TOPIC = "/full/nameof/your/collection"
# Parse incoming messages
def parse_message(msg):
data = json.loads(msg)
# Process your data here
return data
# Define your dataflow
src = KafkaSource(brokers=KAFKA_BOOTSTRAP_SERVERS, topics=[KAFKA_TOPIC], add_config={
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "{}",
"sasl.password": os.getenv("DEKAF_TOKEN"),
})
flow = Dataflow()
flow.input("input", src)
flow.input("input", KafkaInputConfig(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC))
flow.map(parse_message)
# Add more processing steps as needed
flow.output("output", StdOutputConfig())
if __name__ == "__main__":
from bytewax.execution import run_main
run_main(flow) -
Replace
"/full/nameof/your/collection"
with your actual collection name from Estuary Flow. -
Run your Bytewax dataflow:
python your_dataflow_script.py
-
Your Bytewax dataflow is now processing data from Estuary Flow in real-time.