Design patterns

Here are some tips, tricks, and Python features that solve common questions and help you write succinct, easy-to-read code.

Quick Logic Functions

All of the above examples define named custom logic functions and then pass them to an operator. Any callable value can be used as-is, though!

This means you can use the following existing callables to help you make code more concise:

You can also use lambdas to quickly define one-off anonymous functions for simple custom logic.

The following sets of examples are equivalent.

from bytewax.dataflow import Dataflow
from bytewax.testing import run_main, TestingInput
from bytewax.connectors.stdio import StdOutput
from bytewax.window import SystemClockConfig, TumblingWindow
from datetime import timedelta, datetime, timezone

For flat map:

def split_sentence(sentence):
    return sentence.split()


inp = ["hello world"]
flow = Dataflow()
flow.input("inp", TestingInput(inp))

# Operate on input
flow.flat_map(split_sentence)

flow.output("out", StdOutput())
run_main(flow)
hello
world
inp = ["hello world"]
flow = Dataflow()
flow.input("inp", TestingInput(inp))

# Operate on input
flow.flat_map(lambda s: s.split())

flow.output("out", StdOutput())
run_main(flow)
hello
world
inp = ["hello world"]
flow = Dataflow()
flow.input("inp", TestingInput(inp))

# Operate on input
flow.flat_map(str.split)

flow.output("out", StdOutput())
run_main(flow)
hello
world

For reduce window:

def add_to_list(l, items):
    l.extend(items)
    return l


clock = SystemClockConfig()
window = TumblingWindow(
    length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
)

inp = [("a", ["x"]), ("a", ["y"])]
flow = Dataflow()
flow.input("inp", TestingInput(inp))

# Operate on input
flow.reduce_window("reduce", clock, window, add_to_list)
flow.output("out", StdOutput())

run_main(flow)
('a', ['x', 'y'])
inp = [("a", ["x"]), ("a", ["y"])]
flow = Dataflow()
flow.input("inp", TestingInput(inp))

# Operate on input
flow.reduce_window("reduce", clock, window, lambda l1, l2: l1 + l2)
flow.output("out", StdOutput())

run_main(flow)
('a', ['x', 'y'])
import operator

inp = [("a", ["x"]), ("a", ["y"])]
flow = Dataflow()
flow.input("inp", TestingInput(inp))

# Operate on input
flow.reduce_window("reduce", clock, window, operator.add)
flow.output("out", StdOutput())

run_main(flow)
('a', ['x', 'y'])

Subflows

If you find yourself repeating a series of steps in your dataflows or want to give some steps a descriptive name, you can group those steps into a subflow function which adds a sequence of steps.

You can then call that subflow function whenever you need that step sequence. This is just calling a function.

def user_reducer(all_events, new_events):
    return all_events + new_events


def collect_user_events(flow, clock, window):
    # event
    flow.map(lambda e: (e["user_id"], [e["type"]]))
    # (user_id, [event])
    flow.reduce_window("reducer", clock, window, user_reducer)
    # (user_id, events_for_user)
    flow.map(lambda e: {"user_id": e[0], "all_events": e[1]})


clock = SystemClockConfig()
window = TumblingWindow(
    length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
)

inp = [{"user_id": "1", "type": "login"}, {"user_id": "1", "type": "logout"}]
flow = Dataflow()
flow.input("inp", TestingInput(inp))

# Operate on input
collect_user_events(flow, clock, window)
flow.output("out", StdOutput())

run_main(flow)
{'user_id': '1', 'all_events': ['login', 'logout']}
In this article