Introducing the Bytewax InfluxDB Connector

By Zander Matheson

We’re excited to announce the release of the Bytewax InfluxDB Connector, which allows easy integration between Bytewax and InfluxDB, a leading time-series database.

Whether you’re working with IoT data, telemetry systems, market data or other time-series metrics, this connector streamlines the process of pulling real-time data from InfluxDB and processing it in Bytewax for insights like downsampling and aggregation as well as write data streams to influxdb.

To skip to the code, check out the examples in the connector repository.

This connector enables you to:

  • Write time-series data to InfluxDB using Bytewax's InfluxDBSink.
  • Pull data directly from InfluxDB using Bytewax's InfluxDBSource.
  • Perform real-time transformations in Bytewax, such as downsampling, aggregations, anomaly detection, and predictive analytics.

It's dead easy to use!

Installation

pip install bytewax-influxdb

Writing Data

First, you will need your influxdb details and your API Key to get started.

Next, to write data to influxDB, it's as simple as passing in a list of items from your dataflow in a sink. Below is an example of how you can write in "lines" format, but you could also pass dictionaries, pandas Dataframes, Influx Points, Polars Dataframes and more.

import os
import logging
from datetime import timedelta

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
from bytewax.influxdb import InfluxDBSink

TOKEN = os.getenv(
    "INLFUXDB_TOKEN",
    "my-token",
)
DATABASE = os.getenv("INFLUXDB_DATABASE", "testing")
ORG = os.getenv("INFLUXDB_ORG", "dev")

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

lines = [
    "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1724258000",
    "home,room=Kitchen temp=21.0,hum=35.9,co=0i 1724258000",
    "home,room=Living\ Room temp=21.4,hum=35.9,co=0i 1724259000",
    "home,room=Kitchen temp=23.0,hum=36.2,co=0i 1724259000",
    "home,room=Living\ Room temp=21.8,hum=36.0,co=0i 1724260000",
    "home,room=Kitchen temp=22.7,hum=36.1,co=0i 1724260000",
    "home,room=Living\ Room temp=22.2,hum=36.0,co=0i 1724261000",
    "home,room=Kitchen temp=22.4,hum=36.0,co=0i 1724261000",
    "home,room=Living\ Room temp=22.2,hum=35.9,co=0i 1724262000",
    "home,room=Kitchen temp=22.5,hum=36.0,co=0i 1724262000",
    "home,room=Living\ Room temp=22.4,hum=36.0,co=0i 1724263000",
    "home,room=Kitchen temp=22.8,hum=36.5,co=1i 1724263000",
    "home,room=Living\ Room temp=22.3,hum=36.1,co=0i 1724264000",
    "home,room=Kitchen temp=22.8,hum=36.3,co=1i 1724264000",
    "home,room=Living\ Room temp=22.3,hum=36.1,co=1i 1724265000",
    "home,room=Kitchen temp=22.7,hum=36.2,co=3i 1724265000",
    "home,room=Living\ Room temp=22.4,hum=36.0,co=4i 1724266000",
    "home,room=Kitchen temp=22.4,hum=36.0,co=7i 1724266000",
    "home,room=Living\ Room temp=22.6,hum=35.9,co=5i 1724267000",
    "home,room=Kitchen temp=22.7,hum=36.0,co=9i 1724267000",
    "home,room=Living\ Room temp=22.8,hum=36.2,co=9i 1724268000",
    "home,room=Kitchen temp=23.3,hum=36.9,co=18i 1724268000",
    "home,room=Living\ Room temp=22.5,hum=36.3,co=14i 1724269000",
    "home,room=Kitchen temp=23.1,hum=36.6,co=22i 1724269000",
    "home,room=Living\ Room temp=22.2,hum=36.4,co=17i 1724270000",
    "home,room=Kitchen temp=22.7,hum=36.5,co=26i 1724270000",
]

flow = Dataflow("simple_output")

stream = op.input("input", flow, TestingSource(lines))
keyed_stream = op.key_on("key_location", stream, lambda x: x.split(",")[0])
op.inspect("check_stream", stream)
batch_readings = op.collect(
    "lines", keyed_stream, max_size=10, timeout=timedelta(milliseconds=50)
)

op.output(
    "out",
    batch_readings,
    InfluxDBSink(
        host="https://us-east-1-1.aws.cloud2.influxdata.com",
        database=DATABASE,
        org=ORG,
        token=TOKEN,
        write_precision="s",
    ),
)

Reading Data

Reading from InfluxDB is simple as well!

Below is an example that will pull the most recent data from InfluxDB every 15 seconds from the home table/bucket:

import os
import logging
from datetime import timedelta, datetime, timezone

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.influxdb import InfluxDBSource

TOKEN = os.getenv(
    "INFLUXDB_TOKEN",
    "my-token",
)
DATABASE = os.getenv("INFLUXDB_DATABASE", "testing")
ORG = os.getenv("INFLUXDB_ORG", "dev")

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

flow = Dataflow("a_simple_example")

inp = op.input(
    "inp",
    flow,
    InfluxDBSource(
        timedelta(seconds=15),
        "https://us-east-1-1.aws.cloud2.influxdata.com",
        DATABASE,
        TOKEN,
        "home",
        ORG,
        tz=timezone.utc),
    ),
)
op.inspect("input", inp)
> INFO:bytewax.influxdb:last_time:2024-08-21 17:03:20+00:00
> a_simple_example.input: pyarrow.RecordBatch
co: int64
hum: double
room: string
temp: double
time: timestamp[ns] not null
----
co: [0,0,0,0]
hum: [35.9,36.2,35.9,35.9]
room: ["Kitchen","Kitchen","Living Room","Living Room"]
temp: [21,23,21.1,21.4]
time: [2024-08-21 16:33:20.000000000,2024-08-21 16:50:00.000000000,2024-08-21 16:33:20.000000000,2024-08-21 16:50:00.000000000]

Purchasing the Connector

The Bytewax InfluxDB Connector is available via subscription at modules.bytewax.io. This allows you to unlock advanced features and ongoing support.

How it Works

The Bytewax InfluxDB Connector consists of two main components: the Source and the Sink, each responsible for interfacing with InfluxDB in distinct ways. You can see the source code in the GitHub Repository. We will discuss how they work in-depth below.

The InfluxDB Source

The InfluxDBSource class is designed to read data from an InfluxDB instance by partitioning the data retrieval process into fixed intervals. When initializing the Source, you can define:

  • Interval: The time between polls to fetch new data.
  • Host: InfluxDB's server address.
  • Database: The target InfluxDB database.
  • Measurement: The specific time-series data being queried.
  • Start time: Defines when the data query should start from.

It is limited to one partition for the time being due to constraints with the Python client not being able to partition the input data, but you could effectively distribute data across multiple workers down stream if you had resource intensive processing with the bytewax redistribute operator.

The connector’s Source handles querying InfluxDB at predefined intervals and transforms the data into Arrow RecordBatches, which are efficient for in-memory processing in Python. Bytewax pipelines can then process these RecordBatches as they would any other data. For each partition, the source queries data between last_time and now, resuming seamlessly after any interruptions via the snapshot() method, which tracks the last query time.

The typical output of this source is Arrow RecordBatches, which can be converted into formats like pandas DataFrames for further processing.

The InfluxDB Sink

The InfluxDBSink is the counterpart to the Source. This class writes processed data from a Bytewax dataflow back to an InfluxDB instance. It supports multiple workers, making it highly scalable. Key parameters for initialization include:

  • Host: The InfluxDB server address.
  • Database: The target database for writing data.
  • Write precision: Specifies the precision (e.g., nanoseconds, microseconds) for storing time-series data points.
  • Token and Org: For authentication and identifying the InfluxDB organization.

The Sink writes a batch of items, which can be in various formats such as strings, dictionaries, or InfluxDB Point objects, to InfluxDB. It allows for writing real-time, processed data back into InfluxDB using the native write capabilities of the InfluxDB client.

This setup enables Bytewax users to create dataflows that not only ingest time-series data but also write it back to InfluxDB for further analysis, visualization, or storage. The connector's flexibility in terms of input and output types (Arrow RecordBatches in the Source and various formats in the Sink) makes it a powerful tool for building dynamic, real-time data pipelines.

With the Bytewax InfluxDB Connector, you can now easily manage your time-series data, creating end-to-end data pipelines that fit seamlessly into your existing Bytewax projects. The connector can be purchased via subscription on modules.bytewax.io, enabling you to unlock its full potential.

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Zander Matheson

Zander Matheson

CEO, Founder
Zander is a seasoned data engineer who has founded and currently helms Bytewax. Zander has worked in the data space since 2014 at Heroku, GitHub, and an NLP startup. Before that, he attended business school at the UT Austin and HEC Paris in Europe.
Next post