This blog post discusses the latest connector module available for Bytewax, the ClickHouse Sink. This sink enables users to seamlessly write data from Bytewax into ClickHouse, a high-performance columnar database management system for real-time analytics. We'll explain how the ClickHouse Sink works and how you can use it with your current pipelines.
Overview and Utilization of the ClickHouse Sink
The ClickHouse Sink for Bytewax is designed to facilitate the efficient transfer of data from Bytewax to a ClickHouse database. It leverages PyArrow tables to manage data schemas and ensures eventual consistency based on specified keys.
This integration allows for robust data processing and storage solutions, making it easier to analyze and visualize large datasets in real time.
Installation
To get started, you need to install the bytewax-clickhouse
package. You can do this via pip:
pip install bytewax-clickhouse
The package is available on GitHub to view the code.
Setting Up Your Schema and Order By Clause
First, you will need to define the clickhouse schema string and an order by clause for your ClickHouse table. The schema specifies the structure of the data, while the order by clause ensures data is organized correctly within ClickHouse.
CH_SCHEMA = """
metric String,
value Float64,
ts DateTime,
"""
ORDER_BY = "metric, ts"
Next, it is a requirement to define a PyArrow schema which will be used to convert your records into Arrow format.
PA_SCHEMA = pa.schema(
[
("metric", pa.string()),
("value", pa.float64()),
("ts", pa.timestamp("us")), # microsecond
]
)
Writing Data to ClickHouse with the ClickHouse Operator
It's straightforward to write data to ClickHouse, simply import the operator and specify your parameters.
from bytewax.clickhouse import operators as chop
chop.output(
"output_clickhouse",
metrics,
"metrics",
"admin",
"password",
database="bytewax",
port=8123,
ch_schema=CH_SCHEMA,
order_by=ORDER_BY,
pa_schema=PA_SCHEMA,
timeout=timedelta(seconds=1),
max_size=10,
)
Complete Example: Testing the Sink
To test the ClickHouse Sink, you can create a sample data flow that generates random metrics and writes them to ClickHouse.
from datetime import timedelta, datetime
from bytewax import operators as op
from bytewax.connectors.demo import RandomMetricSource
from bytewax.dataflow import Dataflow
from bytewax.clickhouse import operators as chop
CH_SCHEMA = """
metric String,
value Float64,
ts DateTime,
"""
ORDER_BY = "metric, ts"
PA_SCHEMA = pa.schema(
[
("metric", pa.string()),
("value", pa.float64()),
("ts", pa.timestamp("us")), # microsecond
]
)
flow = Dataflow("test_ch")
# Build a sample stream of metrics
metrica = op.input("inp_a", flow, RandomMetricSource("a_metric"))
metricb = op.input("inp_b", flow, RandomMetricSource("b_metric"))
metricc = op.input("inp_c", flow, RandomMetricSource("c_metric"))
metrics = op.merge("merge", metrica, metricb, metricc)
metrics = op.map("add_time", metrics, lambda x: x + tuple([datetime.now()]))
metrics = op.map("add_key", metrics, lambda x: ("All", x))
op.inspect("metrics", metrics)
chop.output(
"output_clickhouse",
metrics,
"metrics",
"admin",
"password",
database="bytewax",
port=8123,
ch_schema=CH_SCHEMA,
order_by=ORDER_BY,
pa_schema=PA_SCHEMA,
timeout=timedelta(seconds=1),
max_size=10,
)
Before running the test example, you will need to run clickhouse and change the parameters in chop.output
to connect to it.
Behind the Scenes: ClickHouse Connector Code
The ClickHouse Sink is powered by a custom sink and operator that handles converting the data into Arrow format, the connection to clickhouse and data insertion processes. Here's a brief look at the key components:
_ClickHousePartition: Manages the connection to ClickHouse and writes batches of data.
ClickhouseSink: Initializes the connection, checks table existence, and ensures the schema is set up correctly.
clickhouse.operator.output: Collects data and creates the Arrow table.
ClickHouseSink
The crux of the ClickhouseSink is in the initialization. Here we will connect to the database, verify things are correct, create a table if necessary.
# init client
if not self.database:
logger.warning("database not set, using 'default'")
self.database = "default"
client = get_client(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
database=self.database,
)
# Check if the table exists
table_exists_query = f"EXISTS {self.database}.{self.table_name}"
table_exists = client.command(table_exists_query)
if not table_exists:
logger.info(
f"""Table '{self.table_name}' does not exist.
Attempting to create with provided schema"""
)
if schema:
# Create the table with ReplacingMergeTree
create_table_query = f"""
CREATE TABLE {database}.{table_name} (
{self.schema}
) ENGINE = ReplacingMergeTree()
ORDER BY tuple({order_by});
"""
client.command(create_table_query)
logger.info(f"Table '{table_name}' created successfully.")
else:
msg = """Bad Schema. Can't complete execution without schema of format
column1 UInt32,
column2 String,
column3 Date"""
raise ValueError(msg)
else:
logger.info(f"Table '{self.table_name}' exists.")
# Check the MergeTree type
mergetree_type_query = f"""SELECT engine FROM system.tables
WHERE database = '{self.database}' AND name = '{self.table_name}'"""
mergetree_type = client.command(mergetree_type_query)
logger.info(f"MergeTree type of the table '{table_name}': {mergetree_type}")
if "ReplacingMergeTree" not in mergetree_type:
logger.warning(
f"""Table '{table_name}' is not using ReplacingMergeTree.
Consider modifying the table to avoid performance degredation
and/or duplicates on restart"""
)
# Get the table schema
schema_query = f"""
SELECT name, type FROM system.columns
WHERE database = '{self.database}' AND table = '{self.table_name}'
"""
schema_result = client.query(schema_query)
columns = schema_result.result_rows
logger.info(f"Schema of the table '{self.table_name}':")
for column in columns:
logger.info(f"Column: {column[0]}, Type: {column[1]}")
client.close()
One thing to call out in this code is the ReplacingMergeTree engine that we use. ClickHouse has many different engines available that have different strengths. This is selected for the ability to ingest a lot of data as fast as possible. The tradeoff is the timing of consistency. So it is worth noting what engines you plan on using downstream to ensure you are minimizing the duplicate data seen.
ClickHousePartition
For each worker, the ClickHousePartition class will be called and this controls how data is written. This is really straightforward outside of the connection boiler plate.
def write_batch(self, batch: List[Table]) -> None:
arrow_table = concat_tables(batch)
self.client.insert_arrow(
f"{self.database}.{self.table_name}",
arrow_table,
settings={"buffer_size": 0},
)
We use the client to insert the Arrow table. The important parameter to call out is the buffer_size
set to 0. Bytewax does it's own buffering so we can ensure what has been seen and get as close to confident as possible about what has been successfully written.
ClickHouse Output Operator
In order to facilitate the most straightforward utilization of the Sink, a custom operator was developed that will batch up records, convert them to Arrow format and then call the sink. The interesting work done here lies in the code below where we collect the records
def shim_mapper(key__batch: Tuple, pa_schema) -> pa.Table:
key, batch = key__batch
columns = list(zip(*batch))
arrays = []
for i, f in enumerate(pa_schema):
array = pa.array(columns[i], f.type)
arrays.append(array)
t = pa.Table.from_arrays(arrays, schema=pa_schema)
return t
return op.collect("batch", up, timeout=timeout, max_size=max_size).then(
op.map, "map", lambda x: shim_mapper(x, pa_schema)
)
Conclusion
The ClickHouse Sink for Bytewax provides a powerful tool for integrating your data processing workflows with ClickHouse. By following the steps outlined in this blog post, you can quickly set up and start using the ClickHouse Sink to enhance your data analytics capabilities.
Looking for more connectors? We'd love to hear from you in our slack channel or through the connector form on our website. 🙂
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.