Building Sessions from Search Logs
Here is a basic example of using Bytewax to turn an incoming stream of event logs from a hypothetical search engine into metrics over search sessions. In this example, we're going to focus on the dataflow itself and aggregating state, and gloss over details of building this processing into a larger system.
Python modules bytewax
This guide will teach you how to use Bytewax to aggregate on a custom session window on streaming data using reduce and then calculate metrics downstream.
Let's start by defining a data model / schema for our incoming events. We'll make a little model class for all the relevant events we'd want to monitor.
In a more mature system, these might come from external schema or be auto generated.
Generating Input Data
Now that we've got those, here's a small dump of some example data you could imagine coming from your app's events infrastructure.
The input of a dataflow expects a generator. This is done by yielding the events one at a time.
For the moment, we aren't going to be using our resume state to manage failures, but returning the empty state is a requirement for our input builder.
Now that we have some input data, let's start defining the computational steps of our dataflow based on our plan.
We will import the
bytewax.dataflow.Dataflow and the
bytewax.dataflow.ManualInputConfig class to create our Dataflow and define our input.
In this case, we'll use a
ManualInputConfig, which takes the
input_builder function and allows us to specify some custom input source. In our case, our sample data that we defined.
Now that we have a Dataflow, and some input, we can add a series of steps to the dataflow. Steps are made up of operators, that provide a "shape" of transformation, and logic functions, that you supply to do your specific transformation. You can read more about all the operators in our documentation.
Our first task is to make sure to group incoming events by user since no session deals with multiple users.
All Bytewax operators that perform grouping require that their input be in the form of a
(key, value) tuple, where
key is the string the dataflow will group by before passing to the operator logic.
The operator which modifies all data in the stream, one at a time, is map. The map operator takes a Python function as an argument and that function will transform the data, one at a time.
Here we use the map operator with an initial_session function that will pull each event's user ID as a string into that key position.
For the value, we're planning ahead to our next task: sessionization. The operator best shaped for this is the reduce operator which groups items by key, then combines them together into an aggregator in order. We can think about our reduce step as "combine together sessions if they should be joined". We'll be modeling a session as a list of events, so we can start by initializing each event as a list of a single event
[event] and then we will combine with our reducer function.
Reduce requires two bits of logic:
How do I combine sessions? Since session are just Python lists, we can use the
+operator to add them (via the built-in
When is a session complete? In this case, a session is complete when the last item in the session is the app closing. We can write a
session_has_closedfunction to answer that.
The reduce operator takes a unique step ID to help organize the state saved internally along with the reducer function and the close function.
We had to group by user because sessions are per-user, but now that we have sessions, the grouping key is no longer necessary for metrics. We can write a function that will remove the key.
And then add the transformation to our dataflow with a map operator.
Our next task is to split user sessions into search sessions. To do that, we'll use the flat map operator, that allows you to emit multiple items downstream (search sessions) for each input item (user session).
We walk through each user session's events, then whenever we encounter a search, emit downstream the previous events. This works just like
str.split but with objects.
The filter operator allows us to remove any items from a stream that don't match a specific criteria. It is used in conjunction with a Python function that describes the rules.
In this case, we can use it to get rid of all search sessions that don't contain searches and shouldn't contribute to metrics.
We can now move on to our final task: calculating metrics per search session in a map operator.
If there's a click during a search, the CTR is 1.0 for that search, 0.0 otherwise. Given those two extreme values, we can do further statistics to get things like CTR per day, etc
Now that our dataflow is done, we can define an output. In this example, we're just sending the output to Standard Output.
Now we're done with defining the dataflow. Let's run it!
Bytewax provides a few different entry points for executing your dataflow, but because we're focusing on the dataflow in this example, we're going to use
bytewax.execution.run_main which is the most basic execution mode running a single worker in the main process.
We can run this file locally by installing bytewax and running the python file. The recommended way is to use docker, you can run the commands in the run.sh script.
Let's inspect the output and see if it makes sense.
Since the capture step is immediately after calculating CTR, we should see one output item for each search session. That checks out! There were three searches in the input: "dogs", "cats", and "fruit". Only the first two resulted in a click, so they contributed
1.0 to the CTR, while the no-click search contributed