Anomaly Detection with Bytewax & Redpanda

By Zander Matheson
Redpanda & Bytewax
redpanda.png

Uses Bytewax Version 0.8.0

Anomaly detection is a common tool used in real-time systems to detect when systems change. Some examples are a manufacturing plant failure, a security intrusion in a network or a change in user behavior. In this example, we will use machine learning to detect anomalies in AWS CloudWatch CPU utilization metrics for multiple EC2 instances. We are going to develop this dataflow in a Jupyter notebook and use Redpanda as our streaming data source.

Redpanda is a newer player in the streaming ecosystem. The company behind the project is Redpanda Data, formerly known as Vectorized. From their website:

Redpanda is a Kafka® compatible event streaming platform. No Zookeeper®, no JVM, and no code changes required. Use all your favorite open source tooling.

The steps are the following:

  1. Run Redpanda locally
  2. Create a stream of EC2 CPU utilization in a Redpanda topic
  3. Consume the events from Redpanda with Bytewax in a Jupyter notebook
  4. Create a Bytewax dataflow to visualize the data stream and detect anomalies in CPU utilization

If you want to follow along with the notebook, you can find it in the bytewax repo

Running Redpanda

Start by running Redpanda:

docker run -d --name=redpanda-1 --rm -p 9092:9092 docker.vectorized.io/vectorized/redpanda:latest redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M --node-id 0 --check=false

For more information on getting started with Redpanda, check out their official docs here.

Set up the Python dependencies

We're going to install HoloViews, River, kafka-python, and Pandas to get started:

!pip install holoviews==1.14.8 kafka-python==2.0.2 bytewax==0.8.0 river==0.10.1 pandas

HoloViews is a plotting library that can be used for creating dynamic plots, River is an ML package library for online machine learning, kafka-python is an Apache Kafka API library (we'll use this to read from RedPanda) and pandas is a common data manipulation library.

Creating a Data Stream

We're going to stream CloudWatch CPU metrics for this example and we'll start by writing them to a Redpanda topic. Since Redpanda is compatible with the Kafka API, we can use the python-kafka library to do this.

In the code below a topic called ec2_metrics is created and data is loaded from a file called ec2_metrics.csv to the topic. For this example, the data is read from the earliest offset, but in a real world scenario, it would be consumed as it arrives.

topic_name = "ec2_metrics"
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])

try:
    # Create Kafka topic
    topic = NewTopic(name=topic_name, num_partitions=3, replication_factor=1)
    admin = KafkaAdminClient(bootstrap_servers=servers)
    admin.create_topics([topic])
except Exception:
    print(f"Topic {topic_name} is already created")

df = pd.read_csv("examples/sample_data/ec2_metrics.csv")
for row in df[["index", "timestamp", "value", "instance"]].to_dict('records'):
    producer.send(topic_name, json.dumps(row).encode())

With the topic loaded, we can start developing our dataflow in a Jupyter Notebook!

Writing the Dataflow

Importing the Necessary Libraries

import json
import random

from bytewax import Dataflow, spawn_cluster, inputs
import bytewax
from kafka import KafkaConsumer

from river import anomaly
import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
hv.extension('bokeh')

Defining our input

Input to Bytewax can scale with partitions in Redpanda. This can be useful if topics are partitioned on a key that we then want to aggregate against or if we need to keep up with massive throughput. (Parallelized IO is one of the benefits of using Bytewax!). More on this in the Scaling Up section.

After defining the input, the input definition is used as a builder function in the creation of our dataflow.

def input_builder(worker_index, total_workers):
    consumer = KafkaConsumer(
        'ec2_metrics',
        bootstrap_servers=["localhost:9092"],
        auto_offset_reset='earliest'
    )
    for message in consumer:
        data = json.loads(message.value)
        yield data['index'], data 
flow = Dataflow()

Group by Key and Filter

Since there are multiple instances and our goal is to detect anomalies in each individual instance, we will aggregate the data by reshaping it to the format (key, data). In this example instance ID is the key.

Our anomaly detection algorithm expects the data to be between 0 and 1, but our input data is in percentages up to 100, so in addition to this re-shaping, the percentages in the data are dvided by 100.

def group_instance_and_normalize(data):
    data['value'] = float(data['value'])/100
    return data['instance'], data

To make development and vizualization simpler, this code will focus on the CPU data from a single instance using the filter operator.

flow.map(group_instance_and_normalize)
flow.filter(lambda x: x[0] == 'fe7f93')

At this point our data would look something like:

("c6585a", {"index": "1", "value":"0.08", "instance":"fe7f93"})

Defining our Anomaly Detector

We use an unsupervised ML algorithm that learns with additional data points over a defined window length. This is a tree-based approach called Half Space Tree. River provides a really handy class for creating a Half Space Tree. In the code below, an AnomalyDetector class inherits from the HalfSpaceTree class and has its own update method. This Class and the corresponding update method are used in the aggregation operator stateful_map in our dataflow. Stateful map is a one-to-one transformation of values in (key, value) pairs, but allows you to reference a persistent state for each key when doing the transformation.

Since this is a stateful operator, it requires the the input stream to have (key, value) tuples to ensure that all relevant values are routed to the relevant state.

class AnomalyDetector(anomaly.HalfSpaceTrees):
    '''
    Our anomaly detector inherits from the HalfSpaceTrees
    object from the river package and has the following inputs
    

    n_trees – defaults to 10
    height – defaults to 8
    window_size – defaults to 250
    limits (Dict[Hashable, Tuple[float, float]]) – defaults to None
    seed (int) – defaults to None

    '''
    def update(self, data):
        self.learn_one({'value':data['value']})
        data['score'] = self.score_one({'value':data['value']})
        if data['score'] > .7:
            data['anom'] = 1
        else: 
            data['anom'] = 0
        return self, (data['index'], data['timestamp'], data['value'], data['score'], data['anom'])
flow.stateful_map(lambda key: AnomalyDetector(n_trees=5, height=3, window_size=5, seed=42), AnomalyDetector.update)

Our data would now look something like:

# (("fe7f93", {"index": "1", "value":0.08, "instance":"fe7f93", "score":0.02}))

Visualizing the data and the anomalies

Often the easiest way to determine if something is working as expected is to visualize it. Our code will integrate with HoloViews in a way that uses stateful_map to continuously update the plot by sending data to a HoloViews Buffer object. This will update a DynamicMap object. To use the stateful_map operator, a send_to_pipe function is defined that pushes data to the Buffer.

For our visualization, two DynamicMaps are created, one is a line chart of CPU utilization and the other is a series of points corresponding to the CPU values. To better see the anomalous utilization values, we will color the CPU value points according to the score from the anomaly detection algorithm. We call the plot object in Jupyter to add formatting.

If you are following along with the Jupyter notebook, this will create an empty plot that will populate like the graphic shown in the Run step.

data = pd.DataFrame([(0,0)], columns=['cpu_value', 'score'])
buffer = Buffer(data, length = 100)
utilization = hv.DynamicMap(hv.Curve, streams=[buffer])
anom_score = hv.DynamicMap(hv.Points, streams=[buffer])

def send_to_pipe(buffer, x):
    index, timestamp, value, score, anom = x
    buffer.send(pd.DataFrame({'cpu_value':value,'score':anom},index=[index], columns=['cpu_value','score']))
    anom = bool(anom)
    return buffer, (index, timestamp, value, score, anom)

# update the buffer object using the send_to_pipe function
flow.stateful_map(lambda key: buffer, send_to_pipe)

# show our plot
(utilization * anom_score).opts(
    hv.opts.Points(color='score', size=5, cmap='kr'),
    hv.opts.Curve(ylim=(0,1), width=800, show_grid=True))

Filter and Inspect

Next our dataflow will filter the data to only the anomalous values and then print them with the capture operator. since we are developing our code to eventually run as a cluster, we will define an output builder her to print the results.

def output_builder(worker_index, worker_count):
    def output_handler(epoch_time):
        epoch, (instance, (index, t, value, score, is_anomalous)) = epoch_time
        print(
            f"{instance}: time = {t}, value = {value:.3f}, score = {score:.2f}, {is_anomalous}"
        )
    return output_handler
flow.filter(lambda x: x[1][4])
flow.capture()

Running the dataflow

The dataflow is ready to run:

spawn_cluster(flow, input_builder, output_builder)

If you are following along in a notebook, you will see the output from the inspect step and an animated plot like the one below:

fe7f93: time = 2014-02-14 20:22:00, value = 0.713, score = 0.93, True
fe7f93: time = 2014-02-14 20:27:00, value = 0.555, score = 0.88, True
fe7f93: time = 2014-02-14 23:07:00, value = 0.518, score = 0.70, True
fe7f93: time = 2014-02-15 21:02:00, value = 0.371, score = 0.80, True
fe7f93: time = 2014-02-15 21:12:00, value = 0.524, score = 0.85, True
fe7f93: time = 2014-02-17 06:17:00, value = 0.637, score = 0.91, True
fe7f93: time = 2014-02-17 07:07:00, value = 0.660, score = 0.74, True
fe7f93: time = 2014-02-17 07:47:00, value = 0.674, score = 0.74, True
fe7f93: time = 2014-02-17 08:07:00, value = 0.034, score = 0.84, True
fe7f93: time = 2014-02-17 08:12:00, value = 0.033, score = 0.84, True
fe7f93: time = 2014-02-17 08:27:00, value = 0.032, score = 0.84, True
fe7f93: time = 2014-02-17 08:52:00, value = 0.033, score = 0.84, True
... 

Scaling Up

Now to scale things up a bit! The production use case has 3 partitions so we can match the number of threads in the build_and_run() call to the number of partitions. Based on the structure of the input2 builder, this will ingest data from the topic in parallel.

At this point, assuming we are happy with the output that we saw in the previous cells with the ML algorithm parameters we passed, we do not need to visualize the single instance anymore. Both of the steps to filter and visualize have been removed from flow2 shown below.

Running the dataflow will print out the detected anomalies for all the instances:

def input_builder2(worker_index, total_workers):
    consumer = KafkaConsumer(
        'ec2_metrics',
        bootstrap_servers=["localhost:9092"],
        auto_offset_reset='earliest',
        group_id = 'anomaly_detector0'
    )
    for message in consumer:
        data = json.loads(message.value)
        yield data['index'], data

# Define the executor and the dataflow object with the input builder function.
flow2 = Dataflow()
# (1, {"index": "1", "value":"0.11", "instance":"c6585a"})
flow2.map(group_instance_and_normalize)
# ("c6585a", {"index": "1", "value":"0.11", "instance":"c6585a"})
flow2.stateful_map(lambda key: AnomalyDetector(n_trees=5, height=3, window_size=5, seed=42), AnomalyDetector.update)
# (("c6585a", {"index": "1", "value":0.08, "instance":"fe7f93", "score":0.02}))
flow2.filter(lambda x: bool(x[1][4]))
flow2.capture()
spawn_cluster(flow2, input_builder2, output_builder, worker_count_per_proc=3)
    77c1ca: time = 2014-04-02 15:05:00, value = 0.924, score = 0.93, 1
    77c1ca: time = 2014-04-02 17:05:00, value = 0.732, score = 0.93, 1
    77c1ca: time = 2014-04-02 17:10:00, value = 0.841, score = 0.93, 1
    77c1ca: time = 2014-04-02 17:45:00, value = 0.945, score = 0.85, 1
    77c1ca: time = 2014-04-02 18:15:00, value = 0.952, score = 0.75, 1
    77c1ca: time = 2014-04-02 18:25:00, value = 0.765, score = 0.83, 1
    fe7f93: time = 2014-02-14 19:57:00, value = 0.523, score = 0.85, 1
    fe7f93: time = 2014-02-14 20:02:00, value = 0.548, score = 0.88, 1
    fe7f93: time = 2014-02-14 20:07:00, value = 0.557, score = 0.88, 1
    fe7f93: time = 2014-02-14 20:22:00, value = 0.713, score = 0.73, 1
    77c1ca: time = 2014-04-02 21:30:00, value = 0.929, score = 0.92, 1
    77c1ca: time = 2014-04-02 21:35:00, value = 0.912, score = 0.92, 1
    77c1ca: time = 2014-04-02 21:40:00, value = 0.891, score = 0.92, 1
    77c1ca: time = 2014-04-02 22:20:00, value = 0.516, score = 0.77, 1
    77c1ca: time = 2014-04-02 22:30:00, value = 0.639, score = 0.72, 1
    77c1ca: time = 2014-04-02 22:50:00, value = 0.062, score = 0.80, 1
    fe7f93: time = 2014-02-14 22:57:00, value = 0.537, score = 0.88, 1
    fe7f93: time = 2014-02-14 23:02:00, value = 0.515, score = 0.85, 1
    77c1ca: time = 2014-04-02 22:55:00, value = 0.001, score = 0.73, 1

We think Redpanda and Bytewax are an awesome combo!

What did you think of this tutorial? Give us some feedback in our slack or on twitter.

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Zander Matheson

Zander Matheson

CEO, Founder
Zander is a seasoned data engineer who has founded and currently helms Bytewax. Zander has worked in the data space since 2014 at Heroku, GitHub, and an NLP startup. Before that, he attended business school at the UT Austin and HEC Paris in Europe.
Next post