Recoverable Streaming Shopping Cart Application

In this example, we're going to build a small online order fulfillment system. It will join two event streams: one containing customer orders and another containing successful payments, and then emit which orders for each customer have been paid. We're also going to design it to be recoverable and restart-able after invalid data.

Skill level: Intermediate Time to complete: 15 min


Sample Data

Make a file named cart-join.json with the following data:

{"user_id": "a", "type": "order", "order_id": 1}
{"user_id": "a", "type": "order", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 3}
{"user_id": "a", "type": "payment", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 4}
{"user_id": "a", "type": "payment", "order_id": 1}
{"user_id": "b", "type": "payment", "order_id": 4}

Python modules bytewax

Your takeaway

This tutorial will show you how to build a recoverable streaming application that aggregates shoppers data into a shopping cart that is recoverable in the instance that it fails.

Step 1. Input

In a production application you would most likely be using something like kafka or redpanda as the input source. In this scenario we will build our own input source from the file we created earlier. To start we are going to use the manual input configuration with an input builder that doesn't support recovery yet. The builder will read each line of JSON, and emit a tuple of the line number and the parsed JSON. We'll use the line number later in this example when we work on recovery.

Step 2. Dataflow

A dataflow is the unit of workload. They are data parallel directed acyclic graphs that are made up of processing steps. We will walk through the construction of a dataflow to join orders together into our shopping cart.

We will start with an empty dataflow.

Our plan is to use the stateful map operator to actually do the join. All stateful operators require their input data to be a (key, value) tuple so that the system can route all keys so they access the same state. Let's add that key field using the user_id field present in every event.

Now onto the join itself. Stateful map needs two callbacks: One that builds the initial, empty state. And one that combines new items into the state and emits a value downstream.

We'll make a quick dictionary that holds the relevant data.

The constructor of the class can be re-used as the initial state builder.

Now we need the join logic, which will return two values: the updated state and the item to emit downstream. Since we'd like to continuously be emitting the most updated join info, we'll emit the state again.

The items that stateful operators emit also have the relevant key still attached, so in this case we have (user_id, joined_state). Let's format that into a dictionary for output.

Then we can serialize the final output as JSON and print it.

And finally, capture this output and send it to our output builder.

Step 3. Execution

At this point our dataflow is constructed, Now we can run our dataflow:

Cool, we're chugging along and then OH NO!

{"user_id": "a", "paid_order_ids": [], "unpaid_order_ids": [1]}
{"user_id": "a", "paid_order_ids": [], "unpaid_order_ids": [1, 2]}
{"user_id": "b", "paid_order_ids": [], "unpaid_order_ids": [3]}
{"user_id": "a", "paid_order_ids": [2], "unpaid_order_ids": [1]}
{"user_id": "b", "paid_order_ids": [], "unpaid_order_ids": [3, 4]}
Traceback (most recent call last):
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Something went wrong! In this case it was that we had a non-JSON line FAIL HERE in the input, but you could imagine that Kafka consumer breaks or the VM is killed or something else bad happens!

We've also built up very valuable state in our stateful map operator and we don't want to pay the penalty of having to re-read our input all the way from the beginning to build it back up. Let's see how we could have implemented state recovery to allow that to happen in the future!

Step 4. Making our Dataflow Recoverable

Following our checklist in Bytewax's documentation on recovery we need to enhance our input builder with a few things.

First, we need the ability to resume our input from where we left off. When using the ManualInputConfig, we emit a state object, that we will be provided with as the argument resume_state on our next invocation.

Here we're using the line number from enumerate as our state, and we can use that to skip forward in the file to that line.

We need a recovery config that describes where to store the state and progress data for this worker. For this example we'll use a local SQLite database because we're running on a single machine.

Now if we run the dataflow, the internal state will be persisted at the end of each epoch so we can recover mid-way. Since we didn't run with any of the recovery systems activated last time, let's run the dataflow again with them enabled.

As expected, we have the same error.

{"user_id": "a", "paid_order_ids": [], "unpaid_order_ids": [1]}
{"user_id": "a", "paid_order_ids": [], "unpaid_order_ids": [1, 2]}
{"user_id": "b", "paid_order_ids": [], "unpaid_order_ids": [3]}
{"user_id": "a", "paid_order_ids": [2], "unpaid_order_ids": [1]}
{"user_id": "b", "paid_order_ids": [], "unpaid_order_ids": [3, 4]}
Traceback (most recent call last):
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

But this time, we've persisted state and the epoch of failure into the recovery store! If we fix whatever is causing the exception, we can resume the dataflow and still get the correct output. Let's fix up the input handler, reconstruct our dataflow and run it one more time.

Running the dataflow again will pickup very close to where we failed. In this case, the failure happened with an input on line 5, so it resumes from there. As the FAIL HERE string is ignored, there's no output when processing line 5.

{"user_id": "a", "paid_order_ids": [2, 1], "unpaid_order_ids": []}
{"user_id": "b", "paid_order_ids": [4], "unpaid_order_ids": [3]}

Notice how the system did not forget the information from previous lines; we still see that user a has order 1.