Understanding Bytewax Operators for Distributed Data Processing

By Laura Funderburk & Jonas Best

Distributed data processing - the approach of handling and analyzing data across multiple interconnected devices or nodes - has become a critical part of modern data architectures, where scalability, fault tolerance, and real-time data flows are essential. One of the frameworks in this space is Bytewax, a Python-based framework for building distributed dataflows. If you are familiar with stream processing frameworks like Apache Flink or Kafka Streams, Bytewax offers a more Pythonic approach with powerful features for scaling real-time processing workloads. It is based on Rust's timely-dataflow, but you get to work entirely within Python thanks to its Python API.

In this blog post, we’ll explore Bytewax operators using the Bytewax Operator Cheat Sheet. This cheat sheet serves as a handy reference for data engineers and developers who want to quickly apply operators to stream processing tasks.

Bytewax Operator Cheatsheet.png 🔍 Click to enlarge image

Understanding Bytewax Operators

Operators in Bytewax are functions that operate on elements in the stream and can be classified into different types, depending on the type of transformation.

When working with distributed data streams and pipelines, the concept of state becomes crucial, especially for tasks that require tracking information across multiple events. In simple terms, state refers to data that persists across operations and stream elements, which allows for more complex and meaningful transformations beyond stateless operations like map or filter.

Let’s break this down in an intuitive way.

Stateless vs Stateful Transformations

Stateless Transformations operate independently on each incoming data element without any memory of the past. For example, using a map operator to convert text to uppercase or filtering numbers based on a condition.

Stateful Transformations, on the other hand, require memory to maintain information about previous elements. This memory is what we call the state. Every time a new event comes in, the state is updated.

Example: Counting Elements Using State Imagine you have a stream of events where you want to count how many times an event has occurred. To do this, you need to remember how many events have come before the current one, which means you need to maintain state.

  • Initially, the state might start as 0 (no events).
  • Each time a new event comes in, the state increases by 1.
  • The current count is stored as the state and updated as more events come in.

Here is a table to get easily familiarized with current operators:

table 1Understanding Bytewax Operators for Distributed Data Processing.png

For transformations performed on streaming data, or infinite data, Bytewax uses windowing. We will explore this in our next blog.

Let's take a look at a few examples.

Processing smoothie orders with Bytewax operators

Let's take a simple example of smoothie orders - here is the dataset we will work with:

table 2Understanding Bytewax Operators for Distributed Data Processing.png

Before we get started, let's install Bytewax

python -m venv venv
./venv/bin/activate
pip install bytewax==0.21.0

We're now ready! Let's jump in.

Example 1: Filter orders based on a specific ingredient (e.g., "Banana")

The dataflow will search for orders whose smoothies included bananas.

The dataflow uses the input operator to take the data in, followed by the filter operator which checks whether "Banana" is found under the 'ingredients' column. We can use the inspect operator to quickly check the results.


from datetime import datetime, timedelta
from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import run_main
from bytewax.connectors.files import CSVSource


# Initialize the dataflow
flow = Dataflow("init_smoothie")

# Create the input source from the CSV file
orders = op.input("orders", flow, CSVSource("smoothie_orders.csv"))


def contains_banana(order):
    return "Banana" in order['ingredients']

banana_orders = op.filter("filter_banana", orders, contains_banana)

# Note, use the output operator to return results 
# to the terminal, or your preferred Bytewax sink
# https://bytewax.io/connectors 
# op.output("return_results", banana_orders, StdOutSink())

op.inspect("filter_results", banana_orders)
run_main(flow)

This returns the following:

init_smoothie.filter_results: {'order_id': '1', 'time': '2024-08-29 08:00:00', 'order_requested': 'Green Machine', 'ingredients': 'Spinach;Banana;Almond Milk;Chia Seeds'}
init_smoothie.filter_results: {'order_id': '4', 'time': '2024-08-29 08:15:00', 'order_requested': 'Protein Power', 'ingredients': 'Peanut Butter;Banana;Protein Powder;Almond Milk'}

Example 2: Enriching data to incorporate price and total price

Let's enrich our dataset with mock data. We will add the prices and then calculate total price by adding tax.


# Initialize the dataflow
flow = Dataflow("smoothie_orders_flow_key")

# Create the input source from the CSV file
orders = op.input("orders", flow, CSVSource("smoothie_orders.csv"))

def mock_pricing_service(smoothie_name):
    prices = {
        "Green Machine": 5.99,
        "Berry Blast": 6.49,
        "Tropical Twist": 7.49,
        "Protein Power": 8.99
    }
    return prices.get(smoothie_name, 0)

def enrich_with_price(cache, order):
    order['price'] = cache.get(order['order_requested'])
    return order

# Enrich the data to add the price
enriched_orders = op.enrich_cached(
    "enrich_with_price",
    orders,
    mock_pricing_service,
    enrich_with_price
)


op.inspect("inspect_final", enriched_orders)

# Run the dataflow
run_main(flow)

This will return the following - scroll to the right to see complete data structure.

smoothie_orders_flow_key.inspect_final: {'order_id': '1', 'time': '2024-08-29 08:00:00', 'order_requested': 'Green Machine', 'ingredients': 'Spinach;Banana;Almond Milk;Chia Seeds', 'price': 5.99}
smoothie_orders_flow_key.inspect_final: {'order_id': '2', 'time': '2024-08-29 08:05:00', 'order_requested': 'Berry Blast', 'ingredients': 'Strawberry;Blueberry;Greek Yogurt;Honey', 'price': 6.49}
smoothie_orders_flow_key.inspect_final: {'order_id': '3', 'time': '2024-08-29 08:10:00', 'order_requested': 'Tropical Twist', 'ingredients': 'Mango;Pineapple;Coconut Water;Flax Seeds', 'price': 7.49}
smoothie_orders_flow_key.inspect_final: {'order_id': '4', 'time': '2024-08-29 08:15:00', 'order_requested': 'Protein Power', 'ingredients': 'Peanut Butter;Banana;Protein Powder;Almond Milk', 'price': 8.99}

Let's compute the total price


def calculate_total_price(order):
    # Assuming each order has a quantity of 1 for simplicity
    order['total_price'] = round(float(order['price'])*1.15, ndigits=2)
    return order

total_price_orders = op.map("calculate_total_price", 
                            enriched_orders, calculate_total_price)

op.inspect("inspect_final", total_price_orders)

# Run the dataflow
run_main(flow)

This returns

smoothie_orders_flow_key.inspect_final: {'order_id': '1', 'time': '2024-08-29 08:00:00', 'order_requested': 'Green Machine', 'ingredients': 'Spinach;Banana;Almond Milk;Chia Seeds', 'price': 5.99, 'total_price': 6.89}
smoothie_orders_flow_key.inspect_final: {'order_id': '2', 'time': '2024-08-29 08:05:00', 'order_requested': 'Berry Blast', 'ingredients': 'Strawberry;Blueberry;Greek Yogurt;Honey', 'price': 6.49, 'total_price': 7.46}
smoothie_orders_flow_key.inspect_final: {'order_id': '3', 'time': '2024-08-29 08:10:00', 'order_requested': 'Tropical Twist', 'ingredients': 'Mango;Pineapple;Coconut Water;Flax Seeds', 'price': 7.49, 'total_price': 8.61}
smoothie_orders_flow_key.inspect_final: {'order_id': '4', 'time': '2024-08-29 08:15:00', 'order_requested': 'Protein Power', 'ingredients': 'Peanut Butter;Banana;Protein Powder;Almond Milk', 'price': 8.99, 'total_price': 10.34}

Let's now take a look at a stateful transformation example.

Example 3: counting ingredients through stateful transformations

In this example, we will build a class that allows us count ingredients used while keeping track of the state. We will use Bytewax's StatefulBatchLogic class - this is an abstract class to define a stateful operator.


from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import run_main, TestingSource
from bytewax.operators import StatefulBatchLogic

# Define the custom stateful logic class to count each unique ingredient
# Define the custom stateful logic class to count each unique ingredient
class IngredientCountLogic(StatefulBatchLogic):
    def __init__(self):
        # Initialize the ingredient count dictionary to keep track of cumulative counts
        self.ingredient_counts = {}

    def on_batch(self, batch):
        # Iterate over the batch and count each ingredient's occurrences
        for event in batch:
            if isinstance(event, dict):
                ingredients = event['ingredients']
                for ingredient in ingredients:
                    # Update the cumulative count for each ingredient
                    if ingredient in self.ingredient_counts:
                        self.ingredient_counts[ingredient] += 1
                    else:
                        self.ingredient_counts[ingredient] = 1
            else:
                raise TypeError(f"Expected event to be a dictionary, got {type(event)} instead.")

        # Return the ingredient counts and retain the logic
        return [(self.ingredient_counts)], False

    def snapshot(self):
        # Return the current state (ingredient counts) to be saved
        return self.ingredient_counts

    def restore(self, snapshot):
        # Restore the state (ingredient counts) from the snapshot
        self.ingredient_counts = snapshot


# Define the smoothie orders - note orders with the same ID
# represent orders made by the same customer
smoothie_orders = [
    {"order_id": "001", "ingredients": ["banana", "strawberry", "yogurt"]},
    {"order_id": "001", "ingredients": ["spinach", "kale", "apple", "banana"]},
    {"order_id": "002", "ingredients": ["blueberry", "banana", "almond milk"]},
    {"order_id": "002", "ingredients": ["apple", "orange", "carrot", "ginger"]},
]

# Create a dataflow object
flow = Dataflow("smoothie_ingredient_count")

# Create an input operator using the smoothie orders source
orders = op.input("orders", flow, TestingSource(smoothie_orders))

# Extract the key for each order (order ID)
key_on = op.key_on("key_on_order_id", orders, lambda x: x['order_id'])

# Inspect the input for debugging purposes
op.inspect("orders_keyed", key_on)

run_main(flow)

Let's see how the data shape has changed:

smoothie_ingredient_count.orders_keyed: ('001', {'order_id': '001', 'ingredients': ['banana', 'strawberry', 'yogurt']})
smoothie_ingredient_count.orders_keyed: ('001', {'order_id': '002', 'ingredients': ['spinach', 'kale', 'apple', 'banana']})
smoothie_ingredient_count.orders_keyed: ('002', {'order_id': '003', 'ingredients': ['blueberry', 'banana', 'almond milk']})
smoothie_ingredient_count.orders_keyed: ('002', {'order_id': '004', 'ingredients': ['apple', 'orange', 'carrot', 'ginger']})

The stream now returns a tuple with the order id and the ingredients.

Let's apply our function through the stateful_batch operator.


# Apply the stateful batch operator to count each unique ingredient
ingredient_counter = op.stateful_batch("ingredient_counter", key_on, lambda _: IngredientCountLogic())

# Inspect the output
op.inspect("out", ingredient_counter)

# Run the flow
run_main(flow)

This returns

smoothie_ingredient_count.out: ('001', {'banana': 1, 'strawberry': 1, 'yogurt': 1})
smoothie_ingredient_count.out: ('001', {'banana': 2, 'strawberry': 1, 'yogurt': 1, 'spinach': 1, 'kale': 1, 'apple': 1})
smoothie_ingredient_count.out: ('002', {'blueberry': 1, 'banana': 1, 'almond milk': 1})
smoothie_ingredient_count.out: ('002', {'blueberry': 1, 'banana': 1, 'almond milk': 1, 'apple': 1, 'orange': 1, 'carrot': 1, 'ginger': 1})

This dataflow demonstrates how state can be managed for stream processing tasks. The use of the key_on operator is critical for stateful operations, as it allows Bytewax to manage state on a per-key basis. Although this example tracks the total number of ingredients across the same order, the same principles can be applied to more complex stateful operations, like maintaining counts or aggregating results for specific keys in distributed dataflows.

Why Use Bytewax Operators?

The advantage of using Bytewax operators is the simplicity and power they bring to stream processing tasks. You can easily chain these operators to build complex data pipelines that scale effortlessly across distributed systems. Additionally, Bytewax provides flexibility in managing state, windowing, and fault tolerance, making it an excellent choice for building real-time analytics or event-driven applications.

Here are a few key benefits of using Bytewax operators:

  • Pythonic API: Bytewax provides an intuitive Python API, which allows developers to focus on writing business logic without worrying about the intricacies of distributed systems.
  • Stateful Processing: Bytewax operators offer state management across stream elements, enabling complex data transformations and aggregations.
  • Fault Tolerance: Built on top of Rust's Timely Dataflow, Bytewax ensures that your data pipelines are reliable and fault-tolerant.

Whereas these examples were applied on finite data, there are many applications, particularly in real-time scenarios, where continuous processing is necessary. In the next blog, we will take a look at windowing with examples.

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post

Laura Funderburk

Senior Developer Advocate
Laura Funderburk holds a B.Sc. in Mathematics from Simon Fraser University and has extensive work experience as a data scientist. She is passionate about leveraging open source for MLOps and DataOps and is dedicated to outreach and education.

Jonas Best

Chief of Staff
Jonas brings extensive experience from Accenture and Monitor Deloitte, where he managed projects at the intersection of technology and business. Before joining Bytewax, he attended business school at the University of St. Gallen and HEC Paris. He is crucial in coordinating Bytewax's strategic efforts and ensuring seamless operations.
Next post