Building a ClickHouse Sink for Bytewax to Enable Real-Time Analytics

By Zander Matheson

This blog post discusses the latest connector module available for Bytewax, the ClickHouse Sink. This sink enables users to seamlessly write data from Bytewax into ClickHouse, a high-performance columnar database management system for real-time analytics. We'll explain how the ClickHouse Sink works and how you can use it with your current pipelines.

Overview and Utilization of the ClickHouse Sink

The ClickHouse Sink for Bytewax is designed to facilitate the efficient transfer of data from Bytewax to a ClickHouse database. It leverages PyArrow tables to manage data schemas and ensures eventual consistency based on specified keys.

This integration allows for robust data processing and storage solutions, making it easier to analyze and visualize large datasets in real time.

Installation

To get started, you need to install the bytewax-clickhouse package. You can do this via pip:

pip install bytewax-clickhouse

The package is available on GitHub to view the code.

Setting Up Your Schema and Order By Clause

First, you will need to define the clickhouse schema string and an order by clause for your ClickHouse table. The schema specifies the structure of the data, while the order by clause ensures data is organized correctly within ClickHouse.

CH_SCHEMA = """
        metric String,
        value Float64,
        ts DateTime,
        """

ORDER_BY = "metric, ts"

Next, it is a requirement to define a PyArrow schema which will be used to convert your records into Arrow format.

PA_SCHEMA = pa.schema(
    [
        ("metric", pa.string()),
        ("value", pa.float64()),
        ("ts", pa.timestamp("us")),  # microsecond
    ]
)

Writing Data to ClickHouse with the ClickHouse Operator

It's straightforward to write data to ClickHouse, simply import the operator and specify your parameters.

from bytewax.clickhouse import operators as chop

chop.output(    
    "output_clickhouse",
    metrics,
    "metrics",
    "admin",
    "password",
    database="bytewax",
    port=8123,
    ch_schema=CH_SCHEMA,
    order_by=ORDER_BY,
    pa_schema=PA_SCHEMA,
    timeout=timedelta(seconds=1),
    max_size=10,
)

Complete Example: Testing the Sink

To test the ClickHouse Sink, you can create a sample data flow that generates random metrics and writes them to ClickHouse.

from datetime import timedelta, datetime

from bytewax import operators as op
from bytewax.connectors.demo import RandomMetricSource
from bytewax.dataflow import Dataflow

from bytewax.clickhouse import operators as chop

CH_SCHEMA = """
        metric String,
        value Float64,
        ts DateTime,
        """

ORDER_BY = "metric, ts"

PA_SCHEMA = pa.schema(
    [
        ("metric", pa.string()),
        ("value", pa.float64()),
        ("ts", pa.timestamp("us")),  # microsecond
    ]
)


flow = Dataflow("test_ch")

# Build a sample stream of metrics
metrica = op.input("inp_a", flow, RandomMetricSource("a_metric"))
metricb = op.input("inp_b", flow, RandomMetricSource("b_metric"))
metricc = op.input("inp_c", flow, RandomMetricSource("c_metric"))
metrics = op.merge("merge", metrica, metricb, metricc)
metrics = op.map("add_time", metrics, lambda x: x + tuple([datetime.now()]))
metrics = op.map("add_key", metrics, lambda x: ("All", x))
op.inspect("metrics", metrics)


chop.output(
    "output_clickhouse",
    metrics,
    "metrics",
    "admin",
    "password",
    database="bytewax",
    port=8123,
    ch_schema=CH_SCHEMA,
    order_by=ORDER_BY,
    pa_schema=PA_SCHEMA,
    timeout=timedelta(seconds=1),
    max_size=10,
)

Before running the test example, you will need to run clickhouse and change the parameters in chop.output to connect to it.

Behind the Scenes: ClickHouse Connector Code

The ClickHouse Sink is powered by a custom sink and operator that handles converting the data into Arrow format, the connection to clickhouse and data insertion processes. Here's a brief look at the key components:

  1. _ClickHousePartition: Manages the connection to ClickHouse and writes batches of data.

  2. ClickhouseSink: Initializes the connection, checks table existence, and ensures the schema is set up correctly.

  3. clickhouse.operator.output: Collects data and creates the Arrow table.

ClickHouseSink

The crux of the ClickhouseSink is in the initialization. Here we will connect to the database, verify things are correct, create a table if necessary.

# init client
        if not self.database:
            logger.warning("database not set, using 'default'")
            self.database = "default"
        client = get_client(
            host=self.host,
            port=self.port,
            username=self.username,
            password=self.password,
            database=self.database,
        )

        # Check if the table exists
        table_exists_query = f"EXISTS {self.database}.{self.table_name}"
        table_exists = client.command(table_exists_query)
        if not table_exists:
            logger.info(
                f"""Table '{self.table_name}' does not exist.
                        Attempting to create with provided schema"""
            )
            if schema:
                # Create the table with ReplacingMergeTree
                create_table_query = f"""
                CREATE TABLE {database}.{table_name} (
                    {self.schema}
                ) ENGINE = ReplacingMergeTree()
                ORDER BY tuple({order_by});
                """
                client.command(create_table_query)
                logger.info(f"Table '{table_name}' created successfully.")
            else:
                msg = """Bad Schema. Can't complete execution without schema of format
                        column1 UInt32,
                        column2 String,
                        column3 Date"""
                raise ValueError(msg)
        else:
            logger.info(f"Table '{self.table_name}' exists.")

            # Check the MergeTree type
            mergetree_type_query = f"""SELECT engine FROM system.tables
                    WHERE database = '{self.database}' AND name = '{self.table_name}'"""
            mergetree_type = client.command(mergetree_type_query)
            logger.info(f"MergeTree type of the table '{table_name}': {mergetree_type}")

            if "ReplacingMergeTree" not in mergetree_type:
                logger.warning(
                    f"""Table '{table_name}' is not using ReplacingMergeTree.
                    Consider modifying the table to avoid performance degredation
                    and/or duplicates on restart"""
                )

            # Get the table schema
            schema_query = f"""
            SELECT name, type FROM system.columns
            WHERE database = '{self.database}' AND table = '{self.table_name}'
            """
            schema_result = client.query(schema_query)
            columns = schema_result.result_rows
            logger.info(f"Schema of the table '{self.table_name}':")
            for column in columns:
                logger.info(f"Column: {column[0]}, Type: {column[1]}")

        client.close()

One thing to call out in this code is the ReplacingMergeTree engine that we use. ClickHouse has many different engines available that have different strengths. This is selected for the ability to ingest a lot of data as fast as possible. The tradeoff is the timing of consistency. So it is worth noting what engines you plan on using downstream to ensure you are minimizing the duplicate data seen.

ClickHousePartition

For each worker, the ClickHousePartition class will be called and this controls how data is written. This is really straightforward outside of the connection boiler plate.

def write_batch(self, batch: List[Table]) -> None:
        arrow_table = concat_tables(batch)
        self.client.insert_arrow(
            f"{self.database}.{self.table_name}",
            arrow_table,
            settings={"buffer_size": 0},
        )

We use the client to insert the Arrow table. The important parameter to call out is the buffer_size set to 0. Bytewax does it's own buffering so we can ensure what has been seen and get as close to confident as possible about what has been successfully written.

ClickHouse Output Operator

In order to facilitate the most straightforward utilization of the Sink, a custom operator was developed that will batch up records, convert them to Arrow format and then call the sink. The interesting work done here lies in the code below where we collect the records

def shim_mapper(key__batch: Tuple, pa_schema) -> pa.Table:
        key, batch = key__batch
        columns = list(zip(*batch))
        arrays = []
        for i, f in enumerate(pa_schema):
            array = pa.array(columns[i], f.type)
            arrays.append(array)
        t = pa.Table.from_arrays(arrays, schema=pa_schema)

        return t

    return op.collect("batch", up, timeout=timeout, max_size=max_size).then(
        op.map, "map", lambda x: shim_mapper(x, pa_schema)
    )

Conclusion

The ClickHouse Sink for Bytewax provides a powerful tool for integrating your data processing workflows with ClickHouse. By following the steps outlined in this blog post, you can quickly set up and start using the ClickHouse Sink to enhance your data analytics capabilities.

Looking for more connectors? We'd love to hear from you in our slack channel or through the connector form on our website. 🙂

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post
Zander Matheson

Zander Matheson

CEO, Founder
Zander is a seasoned data engineer who has founded and currently helms Bytewax. Zander has worked in the data space since 2014 at Heroku, GitHub, and an NLP startup. Before that, he attended business school at the UT Austin and HEC Paris in Europe.
Next post