Integrating Bytewax with SingleStore for a Kafka Sink

By Zander Matheson & David Selassie

Handling real-time data at scale requires the right tools, and managing streaming data is often a balancing act—speed, flexibility, and reliability don’t always align perfectly.That’s why we’re introducing a new integration: a Bytewax sink for SingleStore, designed to simplify the process of moving data from Kafka into a high-performance, distributed database.

With this integration, you gain more control over your data streams, ensuring efficient transformation and storage without unnecessary complexity.

Here’s what you’ll learn:

  • The advantages of SingleStore for real-time data processing
  • How the Bytewax sink was designed and implemented
  • A clear breakdown of the code so you can modify it as needed (Skip to the code --> here)
  • Steps to run and test the integration in your own setup

Let’s explore how this integration can streamline your data operations without the usual fuss.

“Single store, simple sink”.png


What is SingleStore?

SingleStore (formerly known as MemSQL) is a high-performance, scalable SQL database designed for real-time analytics and operational workloads. It supports massive parallel processing, in-memory storage, and high concurrency. SingleStore is an excellent fit for streaming applications where low latency and real-time processing are critical.

Integrating SingleStore with Bytewax allows you to seamlessly ingest and process data from Kafka, transform it on-the-fly, and then write it to SingleStore for further analytics or operational use.


Building the SingleStore Sink in Bytewax

Bytewax makes it straightforward to create custom sinks by implementing a simple interface. Our sink is built on top of Bytewax’s DynamicSink and StatelessSinkPartition classes. Let’s walk through the key parts of the implementation.

1. Setting Up the Sink Partition

The sink partition is responsible for writing batches of data into SingleStore. We encapsulate the database connection logic and the SQL insert operation within this partition.

class _SingleStoreTableSinkPartition(StatelessSinkPartition[Row]):
    _conn: MySQLConnection

    def __init__(self, conn: MySQLConnection, table: str) -> None:
        self._conn = conn
        self._table = table

    @override
    def write_batch(self, items: List[Row]) -> None:
        cur = self._conn.cursor()
        for row in items:
            # Prepare column and placeholder formats dynamically.
            cols_fmt = ", ".join(row.keys())
            vals_fmt = ", ".join(["?"] * len(row))
            cur.execute(
                f"INSERT INTO {self._table} ({cols_fmt}) VALUES ({vals_fmt})",
                tuple(row.values()),
            )
        self._conn.commit()

    @override
    def close(self) -> None:
        self._conn.close()

What’s Happening Here?

  • Initialization: The partition initializes with a MySQL connection (used to connect to SingleStore) and the table name where data will be written.
  • Batch Writing: The write_batch method iterates over each row in the batch, dynamically constructing the column list and placeholders (?). It then executes the insert command for each row and commits the transaction after processing the batch.
  • Cleanup: The close method ensures that the database connection is properly closed when the partition is shut down.

2. Defining the Dynamic Sink

Next, we implement the SingleStoreTableSink that uses our partition. The dynamic sink is responsible for creating partitions based on the worker context.

class SingleStoreTableSink(DynamicSink[Row]):
    def __init__(
        self,
        user: str,
        password: str,
        host: str,
        database: str,
        table: str,
        add_config: Optional[Dict] = None,
    ) -> None:
        self._user = user
        self._password = password
        self._host = host
        self._database = database
        self._table = table
        self._add_config = {} if add_config is None else add_config

    @override
    def build(
        self,
        step_id: str,
        worker_index: int,
        worker_count: int,
    ) -> _SingleStoreTableSinkPartition:
        conn = MySQLConnection(
            user=self._user,
            password=self._password,
            host=self._host,
            database=self._database,
            **self._add_config,
        )
        return _SingleStoreTableSinkPartition(conn, self._table)

Key Points:

  • Initialization: We pass in the connection details for SingleStore (user, password, host, database, and target table). An optional configuration dictionary (add_config) lets you supply any additional parameters required by the MySQLConnection.
  • Building Partitions: The build method creates a new connection and returns an instance of our sink partition. This method is called for each worker, ensuring that every worker in your Bytewax dataflow has its own dedicated connection to SingleStore.

3. Integrating with a Bytewax Dataflow

Now that we have our sink defined, let’s see how it fits into a Bytewax dataflow that reads from Kafka, processes messages, and writes the results to SingleStore.

flow = Dataflow("ss_test")

# Create a Kafka input source.
k_msgs = opk.input("kafka_in", flow, brokers=["localhost:1234"], topics=["test"])

# Define the Avro schema for deserialization.
schema = """
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number",  "type": ["int", "null"]},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}
"""

# Deserialize Kafka messages using Avro.
deser_msgs = opk.deserialize_value("deser", k_msgs.oks, PlainAvroDeserializer(schema))

# Format the message to a dictionary (Row).
def fmtter(msg: KafkaSourceMessage[Optional[bytes], object]) -> Row:
    return cast(Row, msg.value)

prep_inserts = op.map("prep", deser_msgs.oks, fmtter)

# Set up the SingleStore sink with your connection details.
ss_sink = SingleStoreTableSink(
    user="dataflow-test",
    password="...",
    host="svc-3482219c-a389-4079-b18b-d50662524e8a-shared-dml.aws-virginia-6.svc.singlestore.com:3333",
    database="db_david_65f37",
    table="metrics",
)

# Attach the sink to the dataflow.
op.output("ss_out", prep_inserts, ss_sink)

What’s Happening in the Dataflow?

  1. Kafka Source: We create a Kafka input source using Bytewax’s Kafka connector, specifying the broker and topic.
  2. Deserialization: We define an Avro schema for our messages and deserialize them with PlainAvroDeserializer. This converts raw Kafka messages into structured Python dictionaries.
  3. Mapping: The fmtter function converts the deserialized message into a row format (a dictionary) expected by our sink.
  4. Output: Finally, we create an instance of our SingleStoreTableSink with the necessary connection parameters and connect it as the output operator of the dataflow.

Running the Integration

To test the integration:

  1. Set Up Your Environment:

    • Ensure your Kafka cluster is running and accessible.
    • Confirm that your SingleStore database is up and running, and that the target table (metrics in this example) exists.
    • Install the necessary Python packages (bytewax, mysql-connector-python, etc.).
  2. Configure Connection Details:

    • Update the SingleStore connection parameters (user, password, host, database, table) in the SingleStoreTableSink instantiation.
    • Adjust the Kafka brokers and topics as needed.
  3. Run Your Dataflow:

    • Execute your Bytewax dataflow script. You should see your data streaming from Kafka, processed by Bytewax, and then inserted into your SingleStore table.

Conclusion

Integrating Bytewax with SingleStore enables efficient real-time data streaming from Kafka while maintaining flexibility. By leveraging DynamicSink and StatelessSinkPartition, this approach streamlines data ingestion, batch processing, and connection management.

To optimize performance in production, fine-tuning batch sizes, indexing SingleStore effectively, and implementing error handling are essential. This integration also provides a foundation for adapting Bytewax to other database sinks, making it a practical choice for real-time data workflows.

We hope this tutorial helps you get started with your own integrations. If you have any questions, feedback, or want to share your experiences, feel free to comment below or reach out on our community channels.

Happy streaming! 💛💛


For more guides, tutorials, and updates from Bytewax, be sure to subscribe to our blog and follow us on social media. ➡️ X; LinkedIn, ⭐️ GitHub

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post
Zander Matheson

Zander Matheson

CEO, Founder
Zander is a seasoned data engineer who has founded and currently helms Bytewax. Zander has worked in the data space since 2014 at Heroku, GitHub, and an NLP startup. Before that, he attended business school at the UT Austin and HEC Paris in Europe.
David Selassie

David Selassie

Software Engineer
David Selassie is a highly skilled and hands-on Engineering Leader who brings over a decade of expertise to the table. He spearheads the entire lifecycle of creating and implementing Bytewax's innovative platform. David's exceptional expertise in tech allows him to deliver cutting-edge solutions to nontrivial problems.
Next post