![](https://images.production.bytewax.io/small_lenses_98ef1963f3.png)
![](https://images.production.bytewax.io/singlestore_c50d659a95.png)
Handling real-time data at scale requires the right tools, and managing streaming data is often a balancing act—speed, flexibility, and reliability don’t always align perfectly.That’s why we’re introducing a new integration: a Bytewax sink for SingleStore, designed to simplify the process of moving data from Kafka into a high-performance, distributed database.
With this integration, you gain more control over your data streams, ensuring efficient transformation and storage without unnecessary complexity.
Here’s what you’ll learn:
- The advantages of SingleStore for real-time data processing
- How the Bytewax sink was designed and implemented
- A clear breakdown of the code so you can modify it as needed (Skip to the code --> here)
- Steps to run and test the integration in your own setup
Let’s explore how this integration can streamline your data operations without the usual fuss.
What is SingleStore?
SingleStore (formerly known as MemSQL) is a high-performance, scalable SQL database designed for real-time analytics and operational workloads. It supports massive parallel processing, in-memory storage, and high concurrency. SingleStore is an excellent fit for streaming applications where low latency and real-time processing are critical.
Integrating SingleStore with Bytewax allows you to seamlessly ingest and process data from Kafka, transform it on-the-fly, and then write it to SingleStore for further analytics or operational use.
Building the SingleStore Sink in Bytewax
Bytewax makes it straightforward to create custom sinks by implementing a simple interface. Our sink is built on top of Bytewax’s DynamicSink
and StatelessSinkPartition
classes. Let’s walk through the key parts of the implementation.
1. Setting Up the Sink Partition
The sink partition is responsible for writing batches of data into SingleStore. We encapsulate the database connection logic and the SQL insert operation within this partition.
class _SingleStoreTableSinkPartition(StatelessSinkPartition[Row]):
_conn: MySQLConnection
def __init__(self, conn: MySQLConnection, table: str) -> None:
self._conn = conn
self._table = table
@override
def write_batch(self, items: List[Row]) -> None:
cur = self._conn.cursor()
for row in items:
# Prepare column and placeholder formats dynamically.
cols_fmt = ", ".join(row.keys())
vals_fmt = ", ".join(["?"] * len(row))
cur.execute(
f"INSERT INTO {self._table} ({cols_fmt}) VALUES ({vals_fmt})",
tuple(row.values()),
)
self._conn.commit()
@override
def close(self) -> None:
self._conn.close()
What’s Happening Here?
- Initialization: The partition initializes with a MySQL connection (used to connect to SingleStore) and the table name where data will be written.
- Batch Writing: The
write_batch
method iterates over each row in the batch, dynamically constructing the column list and placeholders (?
). It then executes the insert command for each row and commits the transaction after processing the batch. - Cleanup: The
close
method ensures that the database connection is properly closed when the partition is shut down.
2. Defining the Dynamic Sink
Next, we implement the SingleStoreTableSink
that uses our partition. The dynamic sink is responsible for creating partitions based on the worker context.
class SingleStoreTableSink(DynamicSink[Row]):
def __init__(
self,
user: str,
password: str,
host: str,
database: str,
table: str,
add_config: Optional[Dict] = None,
) -> None:
self._user = user
self._password = password
self._host = host
self._database = database
self._table = table
self._add_config = {} if add_config is None else add_config
@override
def build(
self,
step_id: str,
worker_index: int,
worker_count: int,
) -> _SingleStoreTableSinkPartition:
conn = MySQLConnection(
user=self._user,
password=self._password,
host=self._host,
database=self._database,
**self._add_config,
)
return _SingleStoreTableSinkPartition(conn, self._table)
Key Points:
- Initialization: We pass in the connection details for SingleStore (user, password, host, database, and target table). An optional configuration dictionary (
add_config
) lets you supply any additional parameters required by theMySQLConnection
. - Building Partitions: The
build
method creates a new connection and returns an instance of our sink partition. This method is called for each worker, ensuring that every worker in your Bytewax dataflow has its own dedicated connection to SingleStore.
3. Integrating with a Bytewax Dataflow
Now that we have our sink defined, let’s see how it fits into a Bytewax dataflow that reads from Kafka, processes messages, and writes the results to SingleStore.
flow = Dataflow("ss_test")
# Create a Kafka input source.
k_msgs = opk.input("kafka_in", flow, brokers=["localhost:1234"], topics=["test"])
# Define the Avro schema for deserialization.
schema = """
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
"""
# Deserialize Kafka messages using Avro.
deser_msgs = opk.deserialize_value("deser", k_msgs.oks, PlainAvroDeserializer(schema))
# Format the message to a dictionary (Row).
def fmtter(msg: KafkaSourceMessage[Optional[bytes], object]) -> Row:
return cast(Row, msg.value)
prep_inserts = op.map("prep", deser_msgs.oks, fmtter)
# Set up the SingleStore sink with your connection details.
ss_sink = SingleStoreTableSink(
user="dataflow-test",
password="...",
host="svc-3482219c-a389-4079-b18b-d50662524e8a-shared-dml.aws-virginia-6.svc.singlestore.com:3333",
database="db_david_65f37",
table="metrics",
)
# Attach the sink to the dataflow.
op.output("ss_out", prep_inserts, ss_sink)
What’s Happening in the Dataflow?
- Kafka Source: We create a Kafka input source using Bytewax’s Kafka connector, specifying the broker and topic.
- Deserialization: We define an Avro schema for our messages and deserialize them with
PlainAvroDeserializer
. This converts raw Kafka messages into structured Python dictionaries. - Mapping: The
fmtter
function converts the deserialized message into a row format (a dictionary) expected by our sink. - Output: Finally, we create an instance of our
SingleStoreTableSink
with the necessary connection parameters and connect it as the output operator of the dataflow.
Running the Integration
To test the integration:
Set Up Your Environment:
- Ensure your Kafka cluster is running and accessible.
- Confirm that your SingleStore database is up and running, and that the target table (
metrics
in this example) exists. - Install the necessary Python packages (
bytewax
,mysql-connector-python
, etc.).
Configure Connection Details:
- Update the SingleStore connection parameters (user, password, host, database, table) in the
SingleStoreTableSink
instantiation. - Adjust the Kafka brokers and topics as needed.
- Update the SingleStore connection parameters (user, password, host, database, table) in the
Run Your Dataflow:
- Execute your Bytewax dataflow script. You should see your data streaming from Kafka, processed by Bytewax, and then inserted into your SingleStore table.
Conclusion
Integrating Bytewax with SingleStore enables efficient real-time data streaming from Kafka while maintaining flexibility. By leveraging DynamicSink and StatelessSinkPartition, this approach streamlines data ingestion, batch processing, and connection management.
To optimize performance in production, fine-tuning batch sizes, indexing SingleStore effectively, and implementing error handling are essential. This integration also provides a foundation for adapting Bytewax to other database sinks, making it a practical choice for real-time data workflows.
We hope this tutorial helps you get started with your own integrations. If you have any questions, feedback, or want to share your experiences, feel free to comment below or reach out on our community channels.
Happy streaming! 💛💛
For more guides, tutorials, and updates from Bytewax, be sure to subscribe to our blog and follow us on social media. ➡️ X; LinkedIn, ⭐️ GitHub
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.
🐝 +🔍 = ❤️ Bytewax and Lenses.io Integration Announcement
![Zander Matheson](https://images.production.bytewax.io/thumbnail_zander_matheson_d2ee9bae7f.jpg)
Zander Matheson
CEO, Founder![David Selassie](https://images.production.bytewax.io/thumbnail_david_selassie_8521d29e0f.jpg)