Two Powerful Ways to Ingest Streaming Data into Snowflake

By Zander Matheson

With Snowflake Summit top of mind right now, it is fitting to look at how Bytewax and Snowflake interoperate.

Ingesting continuously generated data is the often overlooked face of data engineering. It is a hard task that is difficult to do well, you won't get thanked for doing it well, but everyone will know if you've screwed up. We've looked at this in the past in our blog post "Data Pipelines - Streams to Parquet" that explores using Python to ingest data into the common parquet format.

With snowflake, you can ingest data a few different ways. In this post we will showcase two of these:

  • Snowpipe Streaming: A recent addition to their product portfolio, and
  • Bytewax: Using a custom connector and leveraging the table stream functionality used internally by Snowflake.

15-year-old me would be so stoked to have a snow pipe! I am not sure what it is, but the idea of delivering snow anywhere sounds pretty awesome! - I was a big skier and this sounds like a fantasy

1. Ingesting Data with Snowpipe Streaming

Snowpipe Streaming is a streaming API that is part of Snowflake's Snowpipe offer. Snowpipe broadly provides a continuous data ingestion service that automates the process of loading data. It's designed for real-time or near-real-time data ingestion, ensuring that your data warehouse is always up-to-date with minimal effort.

While Snowpipe allows you to automatically load files to Snowflake, the streaming API is focused on row-level ingestion.

Features of Snowpipe Streaming:

  • Reliable: Snowpipe Streaming is a managed service with high availability. It also comes with ordering guarantees and the capability to engineer pipelines exactly once.
  • Scalable: Snowflake is a serverless offering and the scaling mechanism is handled for you.
  • Real-time Ingestion: Low latency and high throughput for real-time ingestion.

How to Use Snowpipe Streaming:

To get started using the Snowpipe Streaming SDK, you need to have Java installed. If that doesn't work for you, scroll down to the Bytewax overview. The learning materials on the Snowflake Developer Website have an example.

The most likely way you will use Snowflake Snowpipe Streaming is with Kafka Connect, Snowflake Connector, which uses Snowpipe Streaming. However, you can also use the Java SDK, which requires a bit more in-depth understanding of how things work. We will unpack the key pieces below.

Channels

Snowpipe Streaming has an idea of channels, which are like partitions in Kafka. From the documentation, they explain the limitations of the channels:

A single channel maps to exactly one table in Snowflake; however, multiple channels can point to the same table. The Client SDK can open multiple channels to multiple tables; however the SDK cannot open channels across accounts. The ordering of rows and their corresponding offset tokens are preserved within a channel but not across channels that point to the same table.

When designing an ingestion application with Snowpipe Streaming, you will be able to use channels for parallelism in the load, such as mapping to Kafka Partitions or files. This is like how you would use partitions in a Bytewax input connector with the input API.

Offset Tokens

The next core component of Snowpipe Streaming, which enables exactly-once guarantees, is the offset token mechanism.

In both instances, the most likely way you will ingest data using Snowflake is to buffer a number of events and then write them

2. Ingesting Data with Bytewax

While Snowpipe Streaming excels at ingesting data from Kafka, Bytewax offers a more versatile approach, allowing you to ingest streaming data from a wide variety of sources into Snowflake. Bytewax is a powerful tool for real-time data processing, capable of handling complex transformations and integrations with diverse data sources.

Key Features of Bytewax:

  • Simplicity: Simple Python API makes it easy to get started.
  • Flexibility: Bytewax can connect to various data sources, including databases, message queues, APIs, and more.
  • Real-time Processing: It processes data in real-time, enabling instant insights and actions.
  • Customizable Pipelines: Bytewax allows you to build custom data pipelines tailored to your specific needs.
  • Scalable: Bytewax can be scalable across workers to increase throughput.

Bytewax Snowflake Connector

To demonstrate how Bytewax can be used to ingest data into Snowflake, let's dive into the code for a custom Snowflake connector written with the Bytewax input API and explain how it works.

The code below is written to process CDC data and not as a general snowflake

The Code

from bytewax.outputs import StatelessSinkPartition, DynamicSink
import snowflake.connector


def create_temp_table(table_data, destination_table, stream_table_schema):
    """Create a temporary stream table statement.

    Arguments:
    table_data - dictionary of change data used to generate the table.
    """
    columns__types = ", ".join(
        [" ".join(key__value) for key__value in stream_table_schema.items()]
    )
    query_string = f"""CREATE OR REPLACE TABLE temp_{destination_table}_stream ({columns__types});\n"""

    columns = ", ".join(stream_table_schema.keys())
    values = ", ".join(
        [
            "(" + ", ".join([f"'{str(row[x])}'" for x in stream_table_schema.keys() ]) + ")"
            for row in table_data
        ]
    )
    insert_string = f"""INSERT INTO temp_{destination_table}_stream ({columns})
                    VALUES {values};""".replace(
        "'None'", "NULL"
    )
    return (query_string, insert_string)


def merge_table(destination_table, destination_table_schema, primary_key):
    """Create a merge statement to match the change
    data to the destination table data.
    """
    insert = ", ".join(destination_table_schema.keys())
    values = ", ".join([f"S.{x}" for x in destination_table_schema.keys()])
    set_keys = list(destination_table_schema.keys())
    set_keys.remove(primary_key)
    set_statement = ", ".join([f"T.{x} = S.{x}" for x in set_keys])
    merge_statement = f"""MERGE into {destination_table} as T
    using (select *
    from temp_{destination_table}_stream) AS S
    ON T.{primary_key} = S.{primary_key}
    when matched AND S.metadata$action = 'INSERT' AND S.metadata$isupdate
    THEN
    update set {set_statement}
    When matched
    And S.metadata$action = 'DELETE' THEN DELETE
    when not matched
    And S.metadata$action = 'INSERT' THEN
    INSERT ({insert})
    VALUES ({values});"""

    return merge_statement


class SnowflakeSink(DynamicSink):
    def __init__(
        self,
        user,
        password,
        account,
        warehouse,
        database,
        schema,
        source_table_schema,
        primary_key,
        destination_table,
    ):
        self.user = user
        self.password = password
        self.account = account
        self.warehouse = warehouse
        self.database = database
        self.schema = schema
        self.destination_table = destination_table
        self.primary_key = primary_key
        self.destination_table = destination_table
        self.destination_table_schema = source_table_schema
        self.stream_table_schema = source_table_schema | {
            "METADATA$ISUPDATE": "BOOLEAN",
            "METADATA$ACTION": "VARCHAR",
        }

    def build(self, _step_id, _worker_index, _worker_count):
        return _SnowflakePartition(
            self.user,
            self.password,
            self.account,
            self.warehouse,
            self.database,
            self.schema,
            self.destination_table,
            self.destination_table_schema,
            self.primary_key,
            self.stream_table_schema,
        )


class _SnowflakePartition(StatelessSinkPartition):
    def __init__(
        self,
        user,
        password,
        account,
        warehouse,
        database,
        schema,
        destination_table,
        destination_table_schema,
        primary_key,
        stream_table_schema,
    ):
        self.conn = snowflake.connector.connect(
            user=user,
            password=password,
            account=account,
            warehouse=warehouse,
            database=database,
            schema=schema,
        )
        self.destination_table = destination_table
        self.destination_table_schema = destination_table_schema
        self.primary_key = primary_key
        self.stream_table_schema = stream_table_schema

        # create destination table if it doesn't exist
        columns__types = ", ".join(
            [
                " ".join(key__value)
                for key__value in self.destination_table_schema.items()
            ]
        )
        create = f"""CREATE TABLE IF NOT EXISTS {self.destination_table} ({columns__types});"""
        self.conn.cursor().execute(create)

    def write_batch(self, batch):
        """Takes the accumulated change data, reduces overwriting statements,
        formats the correct SQL statements and executes them in snowflake.
        """
        key, batch_events = batch[0]
        table_data = {}
        for event in batch_events:
            if event["op"] == "c":
                # create event
                stream_mods = {
                    "METADATA$ISUPDATE": False,
                    "METADATA$ACTION": "INSERT",
                }
                table_data[event["after"][self.primary_key]] = (
                    event["after"] | stream_mods
                )
            elif event["op"] == "u":
                # update event
                stream_mods = {"METADATA$ISUPDATE": True, "METADATA$ACTION": "INSERT"}
                table_data[event["after"][self.primary_key]] = (
                    event["after"] | stream_mods
                )
            elif event["op"] == "d":
                # delete event
                stream_mods = {"METADATA$ISUPDATE": True, "METADATA$ACTION": "DELETE"}
                table_data[event["before"][self.primary_key]] = (
                    event["before"] | stream_mods
                )

        print(table_data)
        query_string, insert_string = create_temp_table(
            table_data.values(), self.destination_table, self.stream_table_schema
        )
        print(query_string, insert_string)
        self.conn.cursor().execute(query_string)
        self.conn.cursor().execute(insert_string)

        merge = merge_table(
            self.destination_table, self.destination_table_schema, self.primary_key
        )
        self.conn.cursor().execute(merge)

        drop = f"""DROP TABLE temp_{self.destination_table}_stream;"""
        self.conn.cursor().execute(drop)

How the Code Works

The code above takes advantage of how the Snowflake Table Streams work internally. In Snowflake when you replicate a table, you create a stream of changes, that is a temporary table, this is then merged with an existing table depending on the update type (Create, Update, Delete).

  1. Temporary Table Creation: The create_temp_table function generates a SQL statement to create a temporary table in Snowflake to hold the incoming streaming data.
  2. Data Insertion: This function also creates a SQL insert statement to populate the temporary table with the streamed data.
  3. Data Merging: The merge_table function creates a merge statement to update, insert, or delete records in the destination table based on the streamed data.
  4. Dynamic Sink: The SnowflakeSink class initializes the connection parameters and table schemas required for data ingestion.
  5. Partition Handling: The _SnowflakePartition class handles the connection to Snowflake, ensures the destination table exists, and manages the batch processing of streamed data, including the creation, insertion, merging, and cleanup of the temporary table.

Conclusion

Whether you choose Snowpipe for its automation or Bytewax for its simplicity and flexibility, both methods offer robust solutions for ingesting streaming data into Snowflake. Bytewax, in particular, stands out for its ability to connect to various data sources and handle complex transformations, making it a versatile choice for modern data workflows. Using the connector above, you can see how we created a Python CDC pipeline from MySQL to Snowflake.

For more insights and updates on how Bytewax can optimize your data processes, star the repo, subscribe to our newsletter or reach out to our team for personalized assistance on slack!

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Zander Matheson

Zander Matheson

CEO, Founder
Zander is a seasoned data engineer who has founded and currently helms Bytewax. Zander has worked in the data space since 2014 at Heroku, GitHub, and an NLP startup. Before that, he attended business school at the UT Austin and HEC Paris in Europe.
Next post