How to Handle Missing Values in Real-Time ML & AI in Python?
Use a moving average approach to impute the missing value. See hands-on code example in Python on our GitHub.
Given that the real world is never ideal, our datasets are often far from perfect, containing missing values, mismatched data types, and other imperfections. In order to build accurate machine learning models, we must address these missing values. When data is missing, our understanding of the system is incomplete, potentially due to issues such as sensor failure, network issues, or optional data fields. In real-time applications like self-driving cars, heating systems, and smart devices, incorrect interpretation of missing data can have severe consequences.
The process for dealing with missing values on a static dataset is a well documented process. In some cases we simply remove the data, but in other cases there is other data associated with that single row of data and we would like to keep it and use it.
The latter is what is referred as imputing the value of the missing data. Most often you can use methods in Python libraries like pandas, scikit-learn, tensorflow or others to convert the missing values to something else. The most common methods used are statistical because they are easy to calculate and effective. These include the mean, median, mode or just a constant. The crucial aspect of the data imputation is to ensure that the replacement does not significantly alter the data and influence the results. With a known dataset, it's easier to determine the bounds of the problem, but what happens when the bounds are constantly changing or unknown?
In the context of an online machine learning system that is updated in real-time, imputing missing values becomes more complicated than in a static dataset scenario for a variety of reasons. For example, has the distribution of the data changed due to drift? Should a window be used, and if so, what size should it be? How should the updated value be determined, and should changes be recorded?
Things can get quite complex and there are many recent papers on methods to use for missing value imputation on data streams like Online Missing Value Imputation and Change Point Detection with the Gaussian Copul or An Exploration of Online Missing Value Imputation in Non-stationary Data Stream.
However, we can explore the topic by looking at a simple example that uses a moving average approach to impute the missing value. As an ML enthusiast, you probably use Python and are familiar with some of usual libraries like Numpy. To take full advantage of Python ecosystem and to simplify set up we will use Bytewax, a stateful stream processor that is Python native.
To keep this example as trivial as possible, our goals are
- Read inputs (we will mock a stream of data)
- Impute missing values by calculating the mean within a window
- Print the updated data
In production, we could easily replace mocks with real data sources and send the results downstream but for the purpose of this blogpost we keep things simple.
Let’s start by installing Numpy and Bytewax in our environment.
pip install numpy bytewax
Bytewax is based around the concepts of a dataflow. A dataflow is made up of a sequence of operators that interact with data that is “flowing” through. For more information, please check out the documentation.
To begin, let's create a dataflow
from bytewax.dataflow import Dataflow flow = Dataflow()
The Input Code
Let's consider the following generator. It will yield a numpy nan value for every 5th item in a loop. Otherwise it will yield an integer between 0 and 10.
import random import numpy as np def random_datapoints(worker_index, worker_count, state): state = None for i in range(100): if i % 5 == 0: yield state, ('data', np.nan) else: yield state, ('data', random.randrange(0, 10))
We will use this generator function to create a stream of random data points.
from bytewax.inputs import ManualInputConfig flow.input("input", ManualInputConfig(random_datapoints))
When the Bytewax process starts it will call our function
random_datapoints as specified in its
The Stateful Map Code
Before we dive into the code, it is important to understand the stateful map operator. Stateful map is a one-to-one transformation of values in (key, value) pairs, but allows you to reference a persistent state for each key when doing the transformation. The stateful map operator has two parts to it: a
builder function and a
mapper function. The
builder function will get evoked for each new key and the
mapper will get called for every new data point. For more information on how this works, the api docs have a great explanation.
flow.stateful_map("windowed_array", lambda: WindowedArray(10), WindowedArray.impute_value)
In our case our key will be the same for the entire stream because we only have one stream of data in this example. So below we have written some code that will create a
WindowedArray object in the builder function and then use the update function to impute the mean.
class WindowedArray: """Windowed Numpy Array. Create a numpy array to run windowed statistics on. """ def __init__(self, window_size): self.last_n = np.empty(0, dtype=object) self.n = window_size def _push(self, value): self.last_n = np.insert(self.last_n, 0, value) try: self.last_n = np.delete(self.last_n, self.n) except IndexError: pass def impute_value(self, value): self._push(value) if np.isnan(value): new_value = np.nanmean(self.last_n) else: new_value = value return self, (value, new_value)
Let’s unpack the above code. When our class
WindowedArray is initialized, it will create an empty Numpy array with dtype of object.The reason the the object datatype is that this will allow us to add both integers and Nan values. For each new data point that we receive, we will instruct the stateful map operator to use the impute_value method that will check if the value is nan and then calculate the mean from the last
n being the size of array of values we've "remembered". In other words, how many of the values we care about and want to use in our calculation. this will vary on the application itself. It will also add the value to our window (last_n).
The Output Code
Next up we will have an output mechanism, this will be leveraged via the capture operator in the Bytewax library. This is not going to do anything sophisticated, just output the data and the imputed value to standard output.
Now to put it all together and add in the execution method. In this case, we wan't a single, in process dataflow worker. So we use
run_main as the execution method and provide it with the dataflow object.
flow = Dataflow() flow.input("input", ManualInputConfig(random_datapoints)) # ("metric", value) flow.stateful_map("windowed_array", lambda: WindowedArray(10), WindowedArray.impute_value) # ("metric", (old value, new value)) flow.capture(StdOutputConfig()) if __name__ == "__main__": run_main(flow)
That’s it! To run the code simply run it like an ordinary python file on your machine.
Like Bytewax and working with streaming data? Head over to the project’s GitHub repository to check out more examples and give us a star to follow the project.
If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #troubleshooting Slack channel!