Handling Missing Values in Data Streams
Given that the real world is never ideal, our datasets are often far from perfect and contain missing values. In order to build accurate machine learning models, we must address missing values. When data is missing, our understanding of the system is incomplete, potentially due to issues such as sensor failure, network issues, or optional data fields. In real-time applications like self-driving cars, heating systems, and smart devices, incorrect interpretation of missing data can have severe consequences. The process for dealing with missing value is called imputation and we will demonstrate how you can build a custom window to deal with this in Bytewax.
Python modules bytewax numpy
Learn how to create a custom sliding window with the stateful map operator to impute values using numpy
Bytewax is based around the concepts of a dataflow. A dataflow is made up of a sequence of operators that interact with data that is “flowing” through. For more information, please check out the documentation.
To start we create a dataflow object and then we can add an input. The input is based off of a python generator. In our case, we will mock up some "live" data that will yield a numpy nan value for every 5th item in a loop. Otherwise it will yield an integer between 0 and 10.
We will use this generator function to create a stream of random data points.
When the Bytewax process starts it will call our function
random_datapoints on each worker, 1 in this instance. The type of input is specified in the
bytewax.Dataflow.input method and we are using the
ManualInputConfig for our custom input.
Custom Window Using Stateful Map
Before we dive into the code, it is important to understand the stateful map operator. Stateful map is a one-to-one transformation of values in (key, value) pairs, but allows you to reference a persistent state for each key when doing the transformation. The stateful map operator has two parts to it: a
builder function and a
mapper function. The
builder function will get evoked for each new key and the
mapper will get called for every new data point. For more information on how this works, the api docs have a great explanation.
In our case our key will be the same for the entire stream because we only have one stream of data in this example. So, we have some code that will create a
WindowedArray object in the builder function and then use the update function to impute the mean.
Let’s unpack the code. When our class
WindowedArray is initialized, it will create an empty Numpy array with dtype of object.The reason the the object datatype is that this will allow us to add both integers and Nan values. For each new data point that we receive, we will instruct the stateful map operator to use the impute_value method that will check if the value is nan and then calculate the mean from the last
n being the size of array of values we've "remembered". In other words, how many of the values we care about and want to use in our calculation. this will vary on the application itself. It will also add the value to our window (last_n).
Next up we will use the capture operator to write our code to an output source, in this case
StdOutputConfig. This is not going to do anything sophisticated, just output the data and the imputed value to standard output.
Now to put it all together and add in the execution method. In this case, we wan't a single, in process dataflow worker. So we use
run_main as the execution method and provide it with the dataflow object.
That’s it! To run the code simply run it like an ordinary python file on your machine.