Like our project? Star us on Github!
Star

Dealing With Missing Values for Real-Time AI

AI.

As we all know, the real world is never perfect and as a consequence of that imperfection, we often end up with imperfect datasets. Datasets with missing values or mismatched types and other imperfections. Missing values often need to be dealt with in machine learning so that a model is fit correctly and so a prediction can be made.

When data is missing, we have an incomplete representation of the system for some amount of time. This can happen due to a sensor malfunction, some network error or maybe some optional data fields. If we are using this data to make predictions in real-time, the consequences of misinterpreting missing data incorrectly could be disastrous. Imagine a self driving car, a heating system or a multitude of other smart and connected devices.

Data Heart

The process for dealing with missing values on a static dataset is a well documented process. In some cases we simply remove the data, but in other cases there is other data associated with that single row of data and we would like to keep it and use it. The latter is what is referred as imputing the value of the missing data. Most often you can use methods in Python libraries like pandas, scikit-learn, tensorflow or others to convert the missing values to something else. The most common methods used are statistical because they are easy to calculate and effective. These include the mean, median, mode or just a constant. What is important is that the imputed value does not change the data so much that it will influence the output. With a known dataset the bounds of the problem or known, making this easier, but what if the bounds of the problem are always changing or unknown?

When you are dealing with an online learning ML system, one that is being update in real time, imputing the value is not as straightforward as it would be for a static dataset. For instance, has the data drifted and the distribution of the data changed? Do you use a window and if so, how large? How do you update the value and do you record the change?

Things can get quite complex and there are many recent papers on methods to use for missing value imputation on data streams like Online Missing Value Imputation and Change Point Detection with the Gaussian Copul or An Exploration of Online Missing Value Imputation in Non-stationary Data Stream. But let’s dip our toes into this by looking at a more simple and contrived example. For this example, we will use a moving average approach to impute the missing value. To do this we will use Bytewax, a stateful stream processor that is Python native, so we can take advantage of the some of our usual libraries like Numpy.

Setting up our Environment

Let’s start by installing Bytewax and Numpy in our environment.

pip install bytewax numpy

Writing the Code

Bytewax is based around the concepts of a dataflow. A dataflow is made up of a sequence of operators that interact with data that is “flowing” through. If you want to better understand, I suggest checking out the documentation. Alright, so let’s start by creating a Python file called missing_vals.py where we will write our dataflow. The code will comprise three main parts and will leverage Bytewax operators and input mechanisms.

  1. An input mechanism for a stream of data.
  2. A Stateful Map function that we can use to maintain a window and calculate the mean
  3. An output mechanism for the updated data

The Input Code

For the input we will create a stream of random data points via a generator function that can be called when the Bytewax process starts. The code block below also includes the imports if you are following along and moving the code snippets into a separate file.

import random
import numpy as npfrom bytewax import Dataflow, inputs, parse, run_cluster


def random_datapoints():
    for epoch in range(100):
        if epoch % 5 == 0:
            yield f'data', np.nan
        else:
            yield f'data', random.randrange(0, 10)

In the input generator above we will yield a numpy nan value for every 5th item in our loop. Otherwise we will yield an integer between 0 and 10.

The Stateful Map Code

Before we dive into the code, it is important to understand the stateful map operator. Stateful map is a one-to-one transformation of values in (key, value) pairs, but allows you to reference a persistent state for each key when doing the transformation. The stateful map operator has two parts to it: a builder function and a mapper function. The builder function will get evoked for each new key and the mapper will get called for every new data point. For more information on how this works, the api docs have a great explanation.

flow = Dataflow()
flow.stateful_map(lambda key: builder_function, updater_function)

In our case our key will be the same for the entire stream because we only have one stream of data in this example. So below we have written some code that will create a WindowedArray object in the builder function and then use the update function to impute the mean.

class WindowedArray:
    """Windowed Numpy Array.
    Create a numpy array to run windowed statistics on.
    """    def __init__(self, window_size):
        self.last_n = np.empty(0, dtype=object)
        self.n = window_size    def _push(self, value):
        self.last_n = np.insert(self.last_n, 0, value)
        try:
            self.last_n = np.delete(self.last_n, self.n)
        except IndexError:
            pass    def impute_value(self, value):
        self._push(value)
        if np.isnan(value):
            new_value = np.nanmean(self.last_n)
        else:
            new_value = value
        return self, (value, new_value)

Let’s unpack the above code. When our class WindowedArray is initialized, it will create an empty Numpy array with dtype of object. This will allow us to add both integers and Nan values. For each new data point that we receive, we will instruct the stateful map operator to use the impute_value method that will check if the value is nan and then calculate the mean. It will also add the value to our window (last_n).

The Output Code

Next up we will have an output mechanism, this will be leveraged via the capture operator in the Bytewax library. This is not going to do anything sophisticated, just print out the data and the imputed value if necessary.

def inspector(epoch, data):
    metric, (value, imputed) = data
    print(f"data: {value}, imputed value if required {imputed}")

The Dataflow Code

Now to construct the dataflow with the operators and the functions we have defined above. The code below will create a dataflow object, add the operators and their respective functions and then define the code to create a cluster with the input and process the data.

flow = Dataflow()
# ("metric", value)
flow.stateful_map(lambda key: Windowed_Array(10), Windowed_Array.impute_value)
# ("metric", (old value, new value))
flow.capture()if __name__ == "__main__":
    for epoch, item in run_cluster(
        flow, inputs.fully_ordered(random_datapoints()),
        **parse.cluster_args()):
        inspector(epoch, item)

That’s it! To run the code simply run it like an ordinary python file on your machine.

python missing_vals.py

Like Bytewax and working with streaming data? Head over to the project’s GitHub repository to check out more examples and give us a star to follow the project.

Star us on Github
Join our Slack community! Join our Slack!
Any questions? Join our community! Join our Slack!