Skip to main content

Bytewax

This guide demonstrates how to use Estuary Flow to stream data to Bytewax using the Kafka-compatible Dekaf API.

Bytewax is a Python framework for building scalable dataflow applications, designed for high-throughput, low-latency data processing tasks.

Connecting Estuary Flow to Bytewax

  1. Generate a refresh token for the Bytewax connection from the Estuary Admin Dashboard.

  2. Install Bytewax and the Kafka Python client:

    pip install bytewax kafka-python
  3. Create a Python script for your Bytewax dataflow, using the following template:

    import json
    from datetime import timedelta
    from bytewax.dataflow import Dataflow
    from bytewax.inputs import KafkaInputConfig
    from bytewax.outputs import StdOutputConfig
    from bytewax.window import TumblingWindowConfig, SystemClockConfig

    # Estuary Flow Dekaf configuration
    KAFKA_BOOTSTRAP_SERVERS = "dekaf.estuary.dev:9092"
    KAFKA_TOPIC = "/full/nameof/your/collection"

    # Parse incoming messages
    def parse_message(msg):
    data = json.loads(msg)
    # Process your data here
    return data

    # Define your dataflow
    src = KafkaSource(brokers=KAFKA_BOOTSTRAP_SERVERS, topics=[KAFKA_TOPIC], add_config={
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "{}",
    "sasl.password": os.getenv("DEKAF_TOKEN"),
    })

    flow = Dataflow()
    flow.input("input", src)
    flow.input("input", KafkaInputConfig(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC))
    flow.map(parse_message)
    # Add more processing steps as needed
    flow.output("output", StdOutputConfig())

    if __name__ == "__main__":
    from bytewax.execution import run_main
    run_main(flow)
  4. Replace "/full/nameof/your/collection" with your actual collection name from Estuary Flow.

  5. Run your Bytewax dataflow:

    python your_dataflow_script.py
  6. Your Bytewax dataflow is now processing data from Estuary Flow in real-time.