Recoverable Streaming Shopping Cart Application
In this example, we're going to build a small online order fulfillment system. It will join two events within a stream: one event type containing customer orders and another containing successful payments. The dataflow will emit completed orders for each customer that have been paid. It will also handle a failure event without crashing.
Prerequisities
Sample Data
Make a file named cart-join.json
with the following data:
{"user_id": "a", "type": "order", "order_id": 1}
{"user_id": "a", "type": "order", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 3}
{"user_id": "a", "type": "payment", "order_id": 2}
{"user_id": "b", "type": "order", "order_id": 4}
FAIL HERE
{"user_id": "a", "type": "payment", "order_id": 1}
{"user_id": "b", "type": "payment", "order_id": 4}
Python modules bytewax==0.19.*
Your takeaway
This tutorial will show you how to build a recoverable streaming application that aggregates shoppers data into a shopping cart that is recoverable in the instance that it fails.
Imports
First, let's discuss the necessary imports and setup for our application.
We import necessary modules like json for parsing data, and Bytewax-specific components for building the dataflow.
Deserialization Function
To ensure the data integrity and usability, we first define a function to safely deserialize incoming JSON data.
This function attempts to parse JSON data and checks for the necessary keys before returning them as a tuple. If parsing fails, it returns None and prints a warning.
State Management Class
We use a dataclass to manage the state of each user's shopping cart.
This class keeps track of unpaid and paid orders. The update method modifies the state based on the type of event, and the summarize method generates a summary of the state.
In the context of the ShoppingCartState
class, an event refers to a single transaction or action taken by a user regarding their shopping cart. In the Bytewax stream processing setup, events are typically represented as dictionary objects derived from JSON data. Each event contains several key-value pairs that detail the action:
order_id
: This is a unique identifier for an order. It is crucial for tracking the status of each order as it moves from unpaid to paid.type
: This specifies the nature of the event. It can either be "order", indicating the creation of a new order, or "payment", indicating the completion of a payment for an existing order.user_id
and other possible fields that help in further processing or analysis.
Event Handling in update Method
The update method in ShoppingCartState
handles these events:
- If the event type is "order", it adds the order to the
unpaid_order_ids
dictionary with theorder_id
as the key. This way, the order can be easily retrieved and updated upon receiving a payment. - If the event type is "payment", it checks if the
order_id
exists in theunpaid_order_ids
. If it does, it moves the order to thepaid_order_ids
list, marking it as paid. - The method effectively transitions orders based on their payment status, keeping the state of each cart up-to-date.
Stateful Data Processing
Next, we define a state_manager function to manage state transitions and output data summaries.
This function initializes state if not already present, updates the state based on the input, and returns a summarized state.
The state_manager
function is crucial for managing and maintaining the state throughout the lifecycle of the dataflow in a Bytewax application. It deals with state objects and the values processed in each step of the dataflow:
State Initialization: If there is no existing state (state is None), it initializes a new ShoppingCartState. This step is crucial for new users or sessions where previous state data is not available.
State Update: It calls
state.update(value)
, where value is the event tuple(user_id, event)
extracted and filtered from the incoming data. The tuple format is particularly useful here because it bundles related data (user and their event) together, ensuring that all necessary information for state updates is passed as a single unit.State and Summary Return: The function returns a tuple
state, state.sumarize()
. The first element is the updated state object, useful for further stateful operations within the dataflow if needed. The second element,state.summarize()
, provides a snapshot of the current state, specifically listing all paid and unpaid order IDs.
Setting Up the Dataflow
The dataflow processes input from a file and applies transformations and stateful operations.
- Input is read from a JSON file.
- Map applies safe_deserialize.
- Filter removes any None results.
- Stateful Map applies state_manager to manage and summarize state.
Finally, we add a formatting step to prepare the output and define the final output operation.
At this point our dataflow is constructed, and we can run it. Here we're setting our current directory as the path for our SQLite recovery store, and setting our epoch interval to 0, so that we can create a checkpoint of our state for every line in the file:
> python -m bytewax.run dataflow
Skipping invalid data: FAIL HERE
Final summary for user a: {'paid_order_ids': [], 'unpaid_order_ids': [1]}
Final summary for user a: {'paid_order_ids': [], 'unpaid_order_ids': [1, 2]}
Final summary for user b: {'paid_order_ids': [], 'unpaid_order_ids': [3]}
Final summary for user a: {'paid_order_ids': [2], 'unpaid_order_ids': [1]}
Final summary for user b: {'paid_order_ids': [], 'unpaid_order_ids': [3, 4]}
Final summary for user a: {'paid_order_ids': [2, 1], 'unpaid_order_ids': []}
Final summary for user b: {'paid_order_ids': [4], 'unpaid_order_ids': [3]}
The output shows the final summary for each user, including paid and unpaid order IDs. It was also able to handle the invalid data line without crashing.
Summary
Recoverable dataflows are key to any production system. This tutorial demonstrated how you can do this through building a shopping cart application.
We want to hear from you!
If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #troubleshooting Slack channel!