Recoverable Streaming Shopping Cart Application

In this example, we're going to build a small online order fulfillment system. It will join two events within a stream: one event type containing customer orders and another containing successful payments. The dataflow will emit completed orders for each customer that have been paid. It will also handle a failure event without crashing.

Skill level:Intermediate
Time to complete:15 min

Prerequisities

Sample Data

Make a file named cart-join.json with the following data:

{"user_id": "a", "type": "order", "order_id": 1}
{"user_id": "a", "type": "order", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 3}
{"user_id": "a", "type": "payment", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 4}
FAIL HERE
{"user_id": "a", "type": "payment", "order_id": 1}
{"user_id": "b", "type": "payment", "order_id": 4}

Python modules bytewax==0.19.*

Your takeaway

This tutorial will show you how to build a recoverable streaming application that aggregates shoppers data into a shopping cart that is recoverable in the instance that it fails.

Imports

First, let's discuss the necessary imports and setup for our application.

We import necessary modules like json for parsing data, and Bytewax-specific components for building the dataflow.

Deserialization Function

To ensure the data integrity and usability, we first define a function to safely deserialize incoming JSON data.

This function attempts to parse JSON data and checks for the necessary keys before returning them as a tuple. If parsing fails, it returns None and prints a warning.

State Management Class

We use a dataclass to manage the state of each user's shopping cart.

This class keeps track of unpaid and paid orders. The update method modifies the state based on the type of event, and the summarize method generates a summary of the state.

In the context of the ShoppingCartState class, an event refers to a single transaction or action taken by a user regarding their shopping cart. In the Bytewax stream processing setup, events are typically represented as dictionary objects derived from JSON data. Each event contains several key-value pairs that detail the action:

  • order_id: This is a unique identifier for an order. It is crucial for tracking the status of each order as it moves from unpaid to paid.
  • type: This specifies the nature of the event. It can either be "order", indicating the creation of a new order, or "payment", indicating the completion of a payment for an existing order.
  • user_id and other possible fields that help in further processing or analysis.

Event Handling in update Method

The update method in ShoppingCartState handles these events:

  • If the event type is "order", it adds the order to the unpaid_order_ids dictionary with the order_id as the key. This way, the order can be easily retrieved and updated upon receiving a payment.
  • If the event type is "payment", it checks if the order_id exists in the unpaid_order_ids. If it does, it moves the order to the paid_order_ids list, marking it as paid.
  • The method effectively transitions orders based on their payment status, keeping the state of each cart up-to-date.

Stateful Data Processing

Next, we define a state_manager function to manage state transitions and output data summaries.

This function initializes state if not already present, updates the state based on the input, and returns a summarized state.

The state_manager function is crucial for managing and maintaining the state throughout the lifecycle of the dataflow in a Bytewax application. It deals with state objects and the values processed in each step of the dataflow:

  • State Initialization: If there is no existing state (state is None), it initializes a new ShoppingCartState. This step is crucial for new users or sessions where previous state data is not available.

  • State Update: It calls state.update(value), where value is the event tuple (user_id, event) extracted and filtered from the incoming data. The tuple format is particularly useful here because it bundles related data (user and their event) together, ensuring that all necessary information for state updates is passed as a single unit.

  • State and Summary Return: The function returns a tuple state, state.sumarize(). The first element is the updated state object, useful for further stateful operations within the dataflow if needed. The second element,state.summarize(), provides a snapshot of the current state, specifically listing all paid and unpaid order IDs.

Setting Up the Dataflow

The dataflow processes input from a file and applies transformations and stateful operations.

  • Input is read from a JSON file.
  • Map applies safe_deserialize.
  • Filter removes any None results.
  • Stateful Map applies state_manager to manage and summarize state.

Finally, we add a formatting step to prepare the output and define the final output operation.

At this point our dataflow is constructed, and we can run it. Here we're setting our current directory as the path for our SQLite recovery store, and setting our epoch interval to 0, so that we can create a checkpoint of our state for every line in the file:

> python -m bytewax.run dataflow 

Skipping invalid data: FAIL HERE
Final summary for user a: {'paid_order_ids': [], 'unpaid_order_ids': [1]}
Final summary for user a: {'paid_order_ids': [], 'unpaid_order_ids': [1, 2]}
Final summary for user b: {'paid_order_ids': [], 'unpaid_order_ids': [3]}
Final summary for user a: {'paid_order_ids': [2], 'unpaid_order_ids': [1]}
Final summary for user b: {'paid_order_ids': [], 'unpaid_order_ids': [3, 4]}
Final summary for user a: {'paid_order_ids': [2, 1], 'unpaid_order_ids': []}
Final summary for user b: {'paid_order_ids': [4], 'unpaid_order_ids': [3]}

The output shows the final summary for each user, including paid and unpaid order IDs. It was also able to handle the invalid data line without crashing.

Summary

Recoverable dataflows are key to any production system. This tutorial demonstrated how you can do this through building a shopping cart application.

We want to hear from you!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #troubleshooting Slack channel!

Share your tutorial progress!

Where to next?
Check other guides