Uses Bytewax Version 0.8.0
Anomaly detection is a common tool used in real-time systems to detect when systems change. Some examples are a manufacturing plant failure, a security intrusion in a network or a change in user behavior. In this example, we will use machine learning to detect anomalies in AWS CloudWatch CPU utilization metrics for multiple EC2 instances. We are going to develop this dataflow in a Jupyter notebook and use Redpanda as our streaming data source.
Redpanda is a newer player in the streaming ecosystem. The company behind the project is Redpanda Data, formerly known as Vectorized. From their website:
Redpanda is a Kafka® compatible event streaming platform. No Zookeeper®, no JVM, and no code changes required. Use all your favorite open source tooling.
The steps are the following:
- Run Redpanda locally
- Create a stream of EC2 CPU utilization in a Redpanda topic
- Consume the events from Redpanda with Bytewax in a Jupyter notebook
- Create a Bytewax dataflow to visualize the data stream and detect anomalies in CPU utilization
If you want to follow along with the notebook, you can find it in the bytewax repo
Running Redpanda
Start by running Redpanda:
docker run -d --name=redpanda-1 --rm -p 9092:9092 docker.vectorized.io/vectorized/redpanda:latest redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M --node-id 0 --check=false
For more information on getting started with Redpanda, check out their official docs here.
Set up the Python dependencies
We're going to install HoloViews, River, kafka-python, and Pandas to get started:
!pip install holoviews==1.14.8 kafka-python==2.0.2 bytewax==0.8.0 river==0.10.1 pandas
HoloViews is a plotting library that can be used for creating dynamic plots, River is an ML package library for online machine learning, kafka-python is an Apache Kafka API library (we'll use this to read from RedPanda) and pandas is a common data manipulation library.
Creating a Data Stream
We're going to stream CloudWatch CPU metrics for this example and we'll start by writing them to a Redpanda topic. Since Redpanda is compatible with the Kafka API, we can use the python-kafka library to do this.
In the code below a topic called ec2_metrics
is created and data is loaded from a file called ec2_metrics.csv
to the topic. For this example, the data is read from the earliest offset, but in a real world scenario, it would be consumed as it arrives.
topic_name = "ec2_metrics"
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
try:
# Create Kafka topic
topic = NewTopic(name=topic_name, num_partitions=3, replication_factor=1)
admin = KafkaAdminClient(bootstrap_servers=servers)
admin.create_topics([topic])
except Exception:
print(f"Topic {topic_name} is already created")
df = pd.read_csv("examples/sample_data/ec2_metrics.csv")
for row in df[["index", "timestamp", "value", "instance"]].to_dict('records'):
producer.send(topic_name, json.dumps(row).encode())
With the topic loaded, we can start developing our dataflow in a Jupyter Notebook!
Writing the Dataflow
Importing the Necessary Libraries
import json
import random
from bytewax import Dataflow, spawn_cluster, inputs
import bytewax
from kafka import KafkaConsumer
from river import anomaly
import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
hv.extension('bokeh')
Defining our input
Input to Bytewax can scale with partitions in Redpanda. This can be useful if topics are partitioned on a key that we then want to aggregate against or if we need to keep up with massive throughput. (Parallelized IO is one of the benefits of using Bytewax!). More on this in the Scaling Up section.
After defining the input, the input
definition is used as a builder function in the creation of our dataflow.
def input_builder(worker_index, total_workers):
consumer = KafkaConsumer(
'ec2_metrics',
bootstrap_servers=["localhost:9092"],
auto_offset_reset='earliest'
)
for message in consumer:
data = json.loads(message.value)
yield data['index'], data
flow = Dataflow()
Group by Key and Filter
Since there are multiple instances and our goal is to detect anomalies in each individual instance, we will aggregate the data by reshaping it to the format (key, data)
. In this example instance ID is the key.
Our anomaly detection algorithm expects the data to be between 0 and 1, but our input data is in percentages up to 100, so in addition to this re-shaping, the percentages in the data are dvided by 100.
def group_instance_and_normalize(data):
data['value'] = float(data['value'])/100
return data['instance'], data
To make development and vizualization simpler, this code will focus on the CPU data from a single instance using the filter
operator.
flow.map(group_instance_and_normalize)
flow.filter(lambda x: x[0] == 'fe7f93')
At this point our data would look something like:
("c6585a", {"index": "1", "value":"0.08", "instance":"fe7f93"})
Defining our Anomaly Detector
We use an unsupervised ML algorithm that learns with additional data points over a defined window length. This is a tree-based approach called Half Space Tree. River provides a really handy class for creating a Half Space Tree. In the code below, an AnomalyDetector
class inherits from the HalfSpaceTree
class and has its own update method. This Class and the corresponding update method are used in the aggregation operator stateful_map
in our dataflow. Stateful map is a one-to-one transformation of values in (key, value) pairs, but allows you to reference a persistent state for each key when doing the transformation.
Since this is a stateful operator, it requires the the input stream to have (key, value) tuples to ensure that all relevant values are routed to the relevant state.
class AnomalyDetector(anomaly.HalfSpaceTrees):
'''
Our anomaly detector inherits from the HalfSpaceTrees
object from the river package and has the following inputs
n_trees – defaults to 10
height – defaults to 8
window_size – defaults to 250
limits (Dict[Hashable, Tuple[float, float]]) – defaults to None
seed (int) – defaults to None
'''
def update(self, data):
self.learn_one({'value':data['value']})
data['score'] = self.score_one({'value':data['value']})
if data['score'] > .7:
data['anom'] = 1
else:
data['anom'] = 0
return self, (data['index'], data['timestamp'], data['value'], data['score'], data['anom'])
flow.stateful_map(lambda key: AnomalyDetector(n_trees=5, height=3, window_size=5, seed=42), AnomalyDetector.update)
Our data would now look something like:
# (("fe7f93", {"index": "1", "value":0.08, "instance":"fe7f93", "score":0.02}))
Visualizing the data and the anomalies
Often the easiest way to determine if something is working as expected is to visualize it. Our code will integrate with HoloViews in a way that uses stateful_map
to continuously update the plot by sending data to a HoloViews Buffer
object. This will update a DynamicMap
object. To use the stateful_map
operator, a send_to_pipe
function is defined that pushes data to the Buffer
.
For our visualization, two DynamicMaps are created, one is a line chart of CPU utilization and the other is a series of points corresponding to the CPU values. To better see the anomalous utilization values, we will color the CPU value points according to the score from the anomaly detection algorithm. We call the plot
object in Jupyter to add formatting.
If you are following along with the Jupyter notebook, this will create an empty plot that will populate like the graphic shown in the Run step.
data = pd.DataFrame([(0,0)], columns=['cpu_value', 'score'])
buffer = Buffer(data, length = 100)
utilization = hv.DynamicMap(hv.Curve, streams=[buffer])
anom_score = hv.DynamicMap(hv.Points, streams=[buffer])
def send_to_pipe(buffer, x):
index, timestamp, value, score, anom = x
buffer.send(pd.DataFrame({'cpu_value':value,'score':anom},index=[index], columns=['cpu_value','score']))
anom = bool(anom)
return buffer, (index, timestamp, value, score, anom)
# update the buffer object using the send_to_pipe function
flow.stateful_map(lambda key: buffer, send_to_pipe)
# show our plot
(utilization * anom_score).opts(
hv.opts.Points(color='score', size=5, cmap='kr'),
hv.opts.Curve(ylim=(0,1), width=800, show_grid=True))
Filter and Inspect
Next our dataflow will filter the data to only the anomalous values and then print them with the capture
operator. since we are developing our code to eventually run as a cluster, we will define an output builder her to print the results.
def output_builder(worker_index, worker_count):
def output_handler(epoch_time):
epoch, (instance, (index, t, value, score, is_anomalous)) = epoch_time
print(
f"{instance}: time = {t}, value = {value:.3f}, score = {score:.2f}, {is_anomalous}"
)
return output_handler
flow.filter(lambda x: x[1][4])
flow.capture()
Running the dataflow
The dataflow is ready to run:
spawn_cluster(flow, input_builder, output_builder)
If you are following along in a notebook, you will see the output from the inspect step and an animated plot like the one below:
fe7f93: time = 2014-02-14 20:22:00, value = 0.713, score = 0.93, True
fe7f93: time = 2014-02-14 20:27:00, value = 0.555, score = 0.88, True
fe7f93: time = 2014-02-14 23:07:00, value = 0.518, score = 0.70, True
fe7f93: time = 2014-02-15 21:02:00, value = 0.371, score = 0.80, True
fe7f93: time = 2014-02-15 21:12:00, value = 0.524, score = 0.85, True
fe7f93: time = 2014-02-17 06:17:00, value = 0.637, score = 0.91, True
fe7f93: time = 2014-02-17 07:07:00, value = 0.660, score = 0.74, True
fe7f93: time = 2014-02-17 07:47:00, value = 0.674, score = 0.74, True
fe7f93: time = 2014-02-17 08:07:00, value = 0.034, score = 0.84, True
fe7f93: time = 2014-02-17 08:12:00, value = 0.033, score = 0.84, True
fe7f93: time = 2014-02-17 08:27:00, value = 0.032, score = 0.84, True
fe7f93: time = 2014-02-17 08:52:00, value = 0.033, score = 0.84, True
...
Scaling Up
Now to scale things up a bit! The production use case has 3 partitions so we can match the number of threads in the build_and_run()
call to the number of partitions. Based on the structure of the input2 builder, this will ingest data from the topic in parallel.
At this point, assuming we are happy with the output that we saw in the previous cells with the ML algorithm parameters we passed, we do not need to visualize the single instance anymore. Both of the steps to filter and visualize have been removed from flow2
shown below.
Running the dataflow will print out the detected anomalies for all the instances:
def input_builder2(worker_index, total_workers):
consumer = KafkaConsumer(
'ec2_metrics',
bootstrap_servers=["localhost:9092"],
auto_offset_reset='earliest',
group_id = 'anomaly_detector0'
)
for message in consumer:
data = json.loads(message.value)
yield data['index'], data
# Define the executor and the dataflow object with the input builder function.
flow2 = Dataflow()
# (1, {"index": "1", "value":"0.11", "instance":"c6585a"})
flow2.map(group_instance_and_normalize)
# ("c6585a", {"index": "1", "value":"0.11", "instance":"c6585a"})
flow2.stateful_map(lambda key: AnomalyDetector(n_trees=5, height=3, window_size=5, seed=42), AnomalyDetector.update)
# (("c6585a", {"index": "1", "value":0.08, "instance":"fe7f93", "score":0.02}))
flow2.filter(lambda x: bool(x[1][4]))
flow2.capture()
spawn_cluster(flow2, input_builder2, output_builder, worker_count_per_proc=3)
77c1ca: time = 2014-04-02 15:05:00, value = 0.924, score = 0.93, 1
77c1ca: time = 2014-04-02 17:05:00, value = 0.732, score = 0.93, 1
77c1ca: time = 2014-04-02 17:10:00, value = 0.841, score = 0.93, 1
77c1ca: time = 2014-04-02 17:45:00, value = 0.945, score = 0.85, 1
77c1ca: time = 2014-04-02 18:15:00, value = 0.952, score = 0.75, 1
77c1ca: time = 2014-04-02 18:25:00, value = 0.765, score = 0.83, 1
fe7f93: time = 2014-02-14 19:57:00, value = 0.523, score = 0.85, 1
fe7f93: time = 2014-02-14 20:02:00, value = 0.548, score = 0.88, 1
fe7f93: time = 2014-02-14 20:07:00, value = 0.557, score = 0.88, 1
fe7f93: time = 2014-02-14 20:22:00, value = 0.713, score = 0.73, 1
77c1ca: time = 2014-04-02 21:30:00, value = 0.929, score = 0.92, 1
77c1ca: time = 2014-04-02 21:35:00, value = 0.912, score = 0.92, 1
77c1ca: time = 2014-04-02 21:40:00, value = 0.891, score = 0.92, 1
77c1ca: time = 2014-04-02 22:20:00, value = 0.516, score = 0.77, 1
77c1ca: time = 2014-04-02 22:30:00, value = 0.639, score = 0.72, 1
77c1ca: time = 2014-04-02 22:50:00, value = 0.062, score = 0.80, 1
fe7f93: time = 2014-02-14 22:57:00, value = 0.537, score = 0.88, 1
fe7f93: time = 2014-02-14 23:02:00, value = 0.515, score = 0.85, 1
77c1ca: time = 2014-04-02 22:55:00, value = 0.001, score = 0.73, 1
We think Redpanda and Bytewax are an awesome combo!
What did you think of this tutorial? Give us some feedback in our slack or on twitter.
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.