Uses Bytewax Version 0.9.0
Using Server-Sent Events is an awesome way to create a data stream. In this post, we are going to subscribe to an existing SSE event stream, but if you are curious about how to set up something yourself, we have another post explaining just that here
Overview
This is a multi-part post, in this part, we are going to cover how you can ingest SSE events into a dataflow for further analysis and write the output to Redis, a low-latency data store. We will be ingesting data from the Wikimedia event-stream and grouping it by server name to count the number of edits that were made in a 30 second time window. We will then output the results into a Redis database that we will then use in Part 3 of our SSE blog series where we will visualize the results.
Getting Started
Let’s start with setting up our environment and any dependencies.
The Python library we use for SSE is sseclient-py
pip install sseclient-py bytewax==0.9.0
There is a requirement on Redis, if you went through the first part of this series, you have already installed and ran Redis, if not, please review their documentation to run it on your particular operating system.
Creating the Bytewax Dataflow
Let’s start by opening up a new python file called wikistream.py
, and adding the imports that are going to be used.
import collections
import json
import operator
from datetime import timedelta
import sseclient
import urllib3
from bytewax import Dataflow, inputs, parse, spawn_cluster
Creating an Input Builder
Bytewax dataflows require some input to be defined. In most cases this will be in the shape of an input builder. For our SSE event-stream, we create a function open_stream
that will make a request to the Wikimedia event stream endpoint. The response is a generator that can be iterated through and will yield the data from the even stream. In our code, we are using open_stream
in our input_builder
function; of note is the usage of inputs.tumbling_epoch()
, which is a Bytewax helper function that will group the results into a time window. In this case every 30 seconds. In this scenario, we want to be able to restart our dataflow and not overwrite the existing results. In order to manage this, we are going to maintain an epoch_index
key in Redis with the current epoch in it. We will get this value at the start and then when storing our results, this will be used as the key for the Redis sorted set.
def open_stream():
pool = urllib3.PoolManager()
resp = pool.request(
"GET",
"https://stream.wikimedia.org/v2/stream/recentchange/",
preload_content=False,
headers={"Accept": "text/event-stream"},
)
client = sseclient.SSEClient(resp)
for event in client.events():
yield event.data
def input_builder(worker_index, worker_count):
try:
epoch_start = int(r.get("epoch_index")) + 1
except IndexError:
epoch_start = 0
if worker_index == 0:
return inputs.tumbling_epoch(open_stream(), timedelta(seconds=30), epoch_start=epoch_start)
else:
return []
The input_builder
function above will be used when we spawn a cluster later on in our dataflow program.
It is important to note that in the case of SSE we cannot parallelize the input so the inputs will all be received by the worker of index 0. They will be distributed later in the dataflow.
Creating our Dataflow
The steps in our dataflow can be represented by the image below.
First, our inputs need to be parsed from JSON and then reshaped to (key, value)
tuples for aggregation.
def initial_count(data_dict):
return data_dict["server_name"], 1
flow = Dataflow()
# "event_json"
flow.map(json.loads)
# {"server_name": "server.name", ...}
flow.map(initial_count)
# ("server.name", 1)
In the next step, we are aggregating by server name over the epoch to get a count of edits made to each server over the time period (30 seconds). We will use the built-in operator.add
inside the Bytewax operator, reduce_epoch
. Reduce Epoch is a stateful operator, and requires that the input stream has items that are (key, value)
tuples to ensure that all relevant values are routed to the relevant aggregator. It will aggregate based on the function passed as an argument to reduce_epoch
. The returned data from reduce_epoch is a tuple of the format (server_name, count)
.
flow.reduce_epoch(operator.add)
# ("server.name", count)
Now that we have our counts, there are a few different ways we could go with our next step. One would be to reformat the data and then aggregate it again with the stateful_map
operator and write out the sorted list of servers and their counts to Redis. Alternatively, we could lean on Redis a bit here and write out our data as is to Redis, but use a sorted set so we can retrieve them easily in the next part of this series.
We are going to lean on Redis here so we can skip directly to writing out to Redis. We are going to leverage the Bytewax concepts of capture
operators and output_builders
. In our output_builder
we are going to use the Redis client to add the server counts to a sorted set under the same epoch key. There is a bit to unpack in the code below. In our script, we are going to use the spawn_cluster()
call, which allows us to manually coordinate the input and output with the number of dataflow workers we will have. spawn_cluster
takes an input_builder
which we showed above and an output_builder
that is shown below. The output_builder
returns a callback function for each worker thread, called with (epoch, item)
whenever an item passes by a capture operator on this process. In our dataflow, we take the epoch and item and use the Redis zadd
method to add them to a sorted set. After successfully adding the counts to the sorted set, we will set the epoch_index
to match the current epoch.
def output_builder(worker_index, worker_count):
def write_to_redis(data):
epoch, (server_name, counts) = data
r.zadd(str(epoch), {server_name: counts})
r.set("epoch_index", epoch)
return write_to_redis
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
flow.capture()
Now that we have our dataflow defined, we can add the code to coordinate the dataflow to our python file.
if __name__ == "__main__":
spawn_cluster(flow, input_builder, output_builder, **parse.cluster_args())
And run it! Optionally you can increase the number of workers in the command below by adding -w 3
as an argument. This will not increase the ability to ingest in a parallel manner because of the SSE event stream not providing an ability to ingest in parallel.
python wikistream.py
You won't see any output at this point, but after interrupting your process after a couple of minutes, you can start a Python REPL and run the code below. You should see the output of your batched counts!
>>> import redis
>>> r = redis.Redis()
>>> r.zrange("1",0,-1, withscores=True, desc=True)
[(b'commons.wikimedia.org', 16.0), (b'en.wikipedia.org', 7.0), (b'www.wikidata.org', 7.0), (b'ceb.wikipedia.org', 3.0), (b'fa.wikipedia.org', 3.0), (b'fr.wikipedia.org', 3.0), (b'vi.wikipedia.org', 2.0), (b'az.wikipedia.org', 1.0), (b'de.wikipedia.org', 1.0), (b'id.wikipedia.org', 1.0), (b'it.wikipedia.org', 1.0), (b'nl.wikipedia.org', 1.0), (b'tr.wikipedia.org', 1.0)]
If you’ve enjoyed this article or have some feedback? Come join us on our Slack channel and give the repo a star!
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.