Recoverable Streaming Shopping Cart Application

In this example, we're going to build a small online order fulfillment system. It will join two event streams: one containing customer orders and another containing successful payments, and then emit which orders for each customer have been paid. We're also going to design it to be recoverable and restart-able after invalid data.

Skill level: Intermediate Time to complete: 15 min


Sample Data

Make a file named cart-join.json with the following data:

{"user_id": "a", "type": "order", "order_id": 1}
{"user_id": "a", "type": "order", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 3}
{"user_id": "a", "type": "payment", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 4}
{"user_id": "a", "type": "payment", "order_id": 1}
{"user_id": "b", "type": "payment", "order_id": 4}

Python modules bytewax

Your takeaway

This tutorial will show you how to build a recoverable streaming application that aggregates shoppers data into a shopping cart that is recoverable in the instance that it fails.

Step 1. Dataflow

A dataflow is the unit of workload. They are data parallel directed acyclic graphs that are made up of processing steps. We will walk through the construction of a dataflow to join orders together into our shopping cart.

We will start with an empty dataflow.

Step 2. Input

In a production application you would most likely be using something like kafka or redpanda as the input source. In this scenario we will build our own input source from the file we created earlier. To start we are going to use the manual input configuration with an input builder that doesn't support recovery yet. The builder will read each line of JSON, and emit a tuple of the line number and the parsed JSON. We'll use the line number later in this example when we work on recovery.

Each of the lines in the file is a JSON encoded string. Let's add a step to decode our input into a Python dictionary.

Our plan is to use the stateful map operator to actually do the join. All stateful operators require their input data to be a (key, value) tuple so that the system can route all keys so they access the same state. Let's add that key field using the user_id field present in every event.

Now onto the join itself. Stateful map needs two callbacks: One that builds the initial, empty state. And one that combines new items into the state and emits a value downstream.

Our builder function will create the initial dictionary to hold the relevant data.

The constructor of the class can be re-used as the initial state builder.

Now we need the join logic, which will return two values: the updated state and the item to emit downstream. Since we'd like to continuously be emitting the most updated join info, we'll emit the state again.

The items that stateful operators emit also have the relevant key still attached, so in this case we have (user_id, joined_state). Let's format that into a dictionary for output.

Then we can serialize the final output as JSON and print it.

And finally, capture this output and send it to STDOUT.

At this point our dataflow is constructed, and we can run it. Here we're setting our current directory as the path for our SQLite recovery store, and setting our epoch interval to 0, so that we can create a checkpoint of our state for every line in the file:

> python -m dataflow --sqlite-directory . --epoch-interval 0

{'user_id': 'a', 'paid_order_ids': [], 'unpaid_order_ids': [1]}
{'user_id': 'a', 'paid_order_ids': [], 'unpaid_order_ids': [1, 2]}
{'user_id': 'b', 'paid_order_ids': [], 'unpaid_order_ids': [3]}
{'user_id': 'a', 'paid_order_ids': [2], 'unpaid_order_ids': [1]}
{'user_id': 'b', 'paid_order_ids': [], 'unpaid_order_ids': [3, 4]}

TypeError: JSONDecodeError.__init__() missing 2 required positional arguments: 'doc' and 'pos'

Something went wrong! In this case it was that we had a non-JSON line FAIL HERE in the input, but you could imagine that the VM is killed or something else bad happened!

We've also built up very valuable state in our stateful map operator and we don't want to pay the penalty of having to re-read our input all the way from the beginning. Thankfully, we enabled recovery when running our Dataflow, and after we fix the bug, we can resume from where we left off.

If we change this line To use our "bugfixed" function, we can re-run the dataflow and finish processing the items in the file:

Let's run our dataflow again:

> python -m dataflow --sqlite-directory . --epoch-interval 0
{'user_id': 'a', 'paid_order_ids': [2, 1], 'unpaid_order_ids': []}
{'user_id': 'b', 'paid_order_ids': [4], 'unpaid_order_ids': [3]}

Notice how the system did not forget the information from the previous invocation; we still see that user a has paid order_ids 2 and 1.