Module bytewax.dataflow

How to define dataflows.

Create a Dataflow instance, then use the methods on it to add computational steps.

Expand source code
"""How to define dataflows.

Create a `Dataflow` instance, then use the methods on it to add
computational steps.

"""

from .bytewax import Dataflow  # noqa: F401

__all__ = [
    "Dataflow",
]

Classes

class Dataflow

A definition of a Bytewax dataflow graph.

Use the methods defined on this class to add steps with operators of the same name.

Instance variables

var steps

Return an attribute of instance, which is of type owner.

Methods

def collect_window(self, /, step_id, clock_config, window_config)

Collect window lets emits all items for a key in a window downstream in sorted order.

It is a stateful operator. It requires the upstream items are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.

It emits (key, list) tuples downstream at the end of each window where list is sorted by the time assigned by the clock.

Currently, data is permanently allocated per-key. If you have an ever-growing key space, note this.

Args

step_id : str
Uniquely identifies this step for recovery.
clock_config : ClockConfig
Clock config to use. See bytewax.window.
window_config : WindowConfig
Windower config to use. See bytewax.window.
def filter(self, /, predicate)

Filter selectively keeps only some items.

It calls a predicate function on each item.

It emits the item downstream unmodified if the predicate returns True.

It is commonly used for:

  • Selecting relevant events
  • Removing empty events
  • Removing sentinels
  • Removing stop words
>>> from bytewax.testing import TestingInput
>>> from bytewax.connectors.stdio import StdOutput
>>> from bytewax.testing import run_main
>>> from bytewax.dataflow import Dataflow
>>>
>>> flow = Dataflow()
>>> flow.input("inp", TestingInput(range(4)))
>>> def is_odd(item):
...     return item % 2 != 0
>>> flow.filter(is_odd)
>>> flow.output("out", StdOutput())
>>> run_main(flow)
1
3

Args

predicate
predicate(item: Any) => should_emit: bool
def filter_map(self, /, mapper)

Filter map acts as a normal map function, but if the mapper returns None, the item is filtered out.

>>> flow = Dataflow()
>>> def validate(data):
...     if type(data) != dict or "key" not in data:
...         return None
...     else:
...         return data["key"], data
...
>>> flow.filter_map(validate)
def flat_map(self, /, mapper)

Flat map is a one-to-many transformation of items.

It calls a mapper function on each item.

It emits each element in the returned iterator individually downstream in the epoch of the input item.

It is commonly used for:

  • Tokenizing
  • Flattening hierarchical objects
  • Breaking up aggregations for further processing
>>> from bytewax.testing import TestingInput
>>> from bytewax.connectors.stdio import StdOutput
>>> from bytewax.testing import run_main
>>> from bytewax.dataflow import Dataflow
>>> flow = Dataflow()
>>> inp = ["hello world"]
>>> flow.input("inp", TestingInput(inp))
>>> def split_into_words(sentence):
...     return sentence.split()
>>> flow.flat_map(split_into_words)
>>> flow.output("out", StdOutput())
>>> run_main(flow)
hello
world

Args

mapper
mapper(item: Any) => emit: Iterable[Any]
def fold_window(self, /, step_id, clock_config, window_config, builder, folder)

Fold window lets you combine all items for a key within a window into an accumulator, using a function to build its initial value.

It is like Dataflow.reduce_window() but uses a function to build the initial value.

It is a stateful operator. It requires the input stream has items that are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.

It calls two functions:

  • A builder function which is called the first time a key appears and is expected to return the empty state for that key.

  • A folder which combines a new value with an accumulator. The accumulator is initially the output of the builder function. Values will be passed in window order, but no order is defined within a window.

It emits (key, accumulator) tuples downstream when the window closes

>>> from datetime import datetime, timedelta, timezone
>>> from bytewax.dataflow import Dataflow
>>> from bytewax.testing import run_main, TestingInput, TestingOutput
>>> from bytewax.window import TumblingWindow, EventClockConfig
>>> align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
>>>
>>> flow = Dataflow()
>>>
>>> inp = [
...     ("ALL", {"time": align_to, "val": "a"}),
...     ("ALL", {"time": align_to + timedelta(seconds=4), "val": "b"}),
...     ("ALL", {"time": align_to + timedelta(seconds=8), "val": "c"}),
...     # The 10 second window should close just before processing this item.
...     ("ALL", {"time": align_to + timedelta(seconds=12), "val": "d"}),
...     ("ALL", {"time": align_to + timedelta(seconds=16), "val": "e"})
... ]
>>>
>>> flow.input("inp", TestingInput(inp))
>>>
>>> clock_config = EventClockConfig(
...     lambda e: e["time"], wait_for_system_duration=timedelta(seconds=0)
... )
>>> window_config = TumblingWindow(length=timedelta(seconds=10), align_to=align_to)
>>>
>>> def add(acc, x):
...     acc.append(x["val"])
...     return acc
>>>
>>> flow.fold_window("sum", clock_config, window_config, list, add)
>>>
>>> out = []
>>> flow.output("out", TestingOutput(out))
>>>
>>> run_main(flow)
>>>
>>> assert sorted(out) == sorted([("ALL", ["a", "b", "c"]), ("ALL", ["d", "e"])])

Args

step_id
Uniquely identifies this step for recovery.
clock_config
Clock config to use. See bytewax.window.
window_config
Windower config to use. See bytewax.window.
builder
builder(key: Any) => initial_accumulator: Any
folder
folder(accumulator: Any, value: Any) => updated_accumulator: Any
def input(self, /, step_id, input)

At least one input is required on every dataflow.

Emits items downstream from the input source.

See bytewax.inputs for more information on how input works. See bytewax.connectors for a buffet of our built-in connector types.

Args

step_id : str
Uniquely identifies this step for recovery.
input : Input
Input definition.
def inspect(self, /, inspector)

Inspect allows you to observe, but not modify, items.

It calls an inspector callback on each item.

It emits items downstream unmodified.

It is commonly used for debugging.

>>> from bytewax.testing import TestingInput, TestingOutput
>>> from bytewax.testing import run_main
>>> from bytewax.dataflow import Dataflow
>>> flow = Dataflow()
>>> flow.input("inp", TestingInput(range(3)))
>>> def log(item):
...     print("Saw", item)
>>> flow.inspect(log)
>>> out = []
>>> flow.output("out", TestingOutput(out))  # Notice we don't print out.
>>> run_main(flow)
Saw 0
Saw 1
Saw 2

Args

inspector
inspector(item: Any) => None
def inspect_epoch(self, /, inspector)

Inspect epoch allows you to observe, but not modify, items and their epochs.

It calls an inspector function on each item with its epoch.

It emits items downstream unmodified.

It is commonly used for debugging.

>>> from datetime import timedelta
>>> from bytewax.testing import TestingInput, TestingOutput, run_main
>>> from bytewax.dataflow import Dataflow
>>> flow = Dataflow()
>>> flow.input("inp", TestingInput(range(3)))
>>> def log(epoch, item):
...    print(f"Saw {item} @ {epoch}")
>>> flow.inspect_epoch(log)
>>> out = []
>>> flow.output("out", TestingOutput(out))  # Notice we don't print out.
>>> run_main(flow, epoch_interval=timedelta(seconds=0))
Saw 0 @ 0
Saw 1 @ 1
Saw 2 @ 2

Args

inspector
inspector(epoch: int, item: Any) => None
def map(self, /, mapper)

Map is a one-to-one transformation of items.

It calls a mapper function on each item.

It emits each updated item downstream.

It is commonly used for:

  • Extracting keys
  • Turning JSON into objects
  • So many things
>>> from bytewax.connectors.stdio import StdOutput
>>> from bytewax.testing import run_main, TestingInput
>>> from bytewax.dataflow import Dataflow
>>> flow = Dataflow()
>>> flow.input("inp", TestingInput(range(3)))
>>> def add_one(item):
...     return item + 10
>>> flow.map(add_one)
>>> flow.output("out", StdOutput())
>>> run_main(flow)
10
11
12

Args

mapper
mapper(item: Any) => updated_item: Any
def output(self, /, step_id, output)

Write data to an output.

At least one output is required on every dataflow.

Emits items downstream unmodified.

See bytewax.outputs for more information on how output works. See bytewax.connectors for a buffet of our built-in connector types.

Args

step_id : str
Uniquely identifies this step for recovery.
output : Output
Output definition.
def reduce(self, /, step_id, reducer, is_complete)

Reduce lets you combine items for a key into an accumulator.

It is a stateful operator. It requires the input stream has items that are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.

It calls two functions:

  • A reducer which combines a new value with an accumulator. The accumulator is initially the first value seen for a key. Values will be passed in an arbitrary order. If there is only a single value for a key since the last completion, this function will not be called.

  • An is complete function which returns True if the most recent (key, accumulator) should be emitted downstream and the accumulator for that key forgotten. If there was only a single value for a key, it is passed in as the accumulator here.

It emits (key, accumulator) tuples downstream when the is complete function returns True in the epoch of the most recent value for that key.

If the ordering of values is crucial, group beforhand using a windowing operator with a timeout like reduce_window, then sort, then use this operator.

It is commonly used for:

  • Collection into a list
  • Summarizing data
>>> from bytewax.dataflow import Dataflow
>>> from bytewax.testing import TestingInput, run_main
>>> from bytewax.connectors.stdio import StdOutput
>>> flow = Dataflow()
>>> inp = [
...     {"user": "a", "type": "login"},
...     {"user": "a", "type": "post"},
...     {"user": "b", "type": "login"},
...     {"user": "b", "type": "logout"},
...     {"user": "a", "type": "logout"},
... ]
>>> flow.input("inp", TestingInput(inp))
>>> def user_as_key(event):
...     return event["user"], [event]
>>> flow.map(user_as_key)
>>> def extend_session(session, events):
...     session.extend(events)
...     return session
>>> def session_complete(session):
...     return any(event["type"] == "logout" for event in session)
>>> flow.reduce("sessionizer", extend_session, session_complete)
>>> flow.output("out", StdOutput())
>>> run_main(flow)
('b', [{'user': 'b', 'type': 'login'}, {'user': 'b', 'type': 'logout'}])
('a', [{'user': 'a', 'type': 'login'}, {'user': 'a', 'type': 'post'},
       {'user': 'a', 'type': 'logout'}])

Args

step_id : str
Uniquely identifies this step for recovery.
reducer
reducer(accumulator: Any, value: Any) => updated_accumulator: Any
is_complete
is_complete(updated_accumulator: Any) => should_emit: bool
def reduce_window(self, /, step_id, clock_config, window_config, reducer)

Reduce window lets you combine all items for a key within a window into an accumulator.

It is like Dataflow.reduce() but marks the accumulator as complete automatically at the end of each window.

It is a stateful operator. It requires the input stream has items that are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.

It calls a reducer function which combines two values. The accumulator is initially the first value seen for a key. Values will be passed in arbitrary order. If there is only a single value for a key in this window, this function will not be called.

It emits (key, accumulator) tuples downstream at the end of each window.

If the ordering of values is crucial, group in this operator, then sort afterwards.

Currently, data is permanently allocated per-key. If you have an ever-growing key space, note this.

It is commonly used for:

  • Sessionization
>>> from datetime import datetime, timedelta, timezone
>>> from bytewax.testing import TestingInput, TestingOutput, run_main
>>> from bytewax.window import EventClockConfig, TumblingWindow
>>> align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
>>> flow = Dataflow()
>>> inp = [
...     ("b", {"time": align_to, "val": 1}),
...     ("a", {"time": align_to + timedelta(seconds=4), "val": 1}),
...     ("a", {"time": align_to + timedelta(seconds=8), "val": 1}),
...     ("b", {"time": align_to + timedelta(seconds=12), "val": 1}),
... ]
>>> flow.input("inp", TestingInput(inp))
>>> def add(acc, x):
...     acc["val"] += x["val"]
...     return acc
>>> clock_config = EventClockConfig(
...     lambda e: e["time"], wait_for_system_duration=timedelta(0)
... )
>>> window_config = TumblingWindow(
...     length=timedelta(seconds=10), align_to=align_to
... )
>>> flow.reduce_window("count", clock_config, window_config, add)
>>> def extract_val(key__event):
...    key, event = key__event
...    return (key, event["val"])
>>> flow.map(extract_val)
>>> out = []
>>> flow.output("out", TestingOutput(out))
>>> run_main(flow)
>>> assert sorted(out) == sorted([('b', 1), ('a', 2), ('b', 1)])

Args

step_id : str
Uniquely identifies this step for recovery.
clock_config : ClockConfig
Clock config to use. See bytewax.window.
window_config : WindowConfig
Windower config to use. See bytewax.window.
reducer
reducer(accumulator: Any, value: Any) => updated_accumulator: Any
def stateful_map(self, /, step_id, builder, mapper)

Stateful map is a one-to-one transformation of values, but allows you to reference a persistent state for each key when doing the transformation.

It is a stateful operator. It requires the input stream has items that are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.

It calls two functions:

  • A builder which returns a new state and will be called whenever a new key is encountered with the key as a parameter.

  • A mapper which transforms values. Values will be passed in an arbitrary order. If the updated state is None, the state will be forgotten.

It emits a (key, updated_value) tuple downstream for each input item.

If the ordering of values is crucial, group beforhand using a windowing operator with a timeout like reduce_window, then sort, then use this operator.

It is commonly used for:

  • Anomaly detection
  • State machines
>>> from bytewax.testing import TestingInput, run_main
>>> from bytewax.connectors.stdio import StdOutput
>>> flow = Dataflow()
>>> inp = [
...     "a",
...     "a",
...     "a",
...     "a",
...     "b",
... ]
>>> flow.input("inp", TestingInput(inp))
>>> def self_as_key(item):
...     return item, item
>>> flow.map(self_as_key)
>>> def build_count():
...     return 0
>>> def check(running_count, item):
...     running_count += 1
...     if running_count == 1:
...         return running_count, item
...     else:
...         return running_count, None
>>> flow.stateful_map("remove_duplicates", build_count, check)
>>> def remove_none_and_key(key_item):
...     key, item = key_item
...     if item is None:
...         return []
...     else:
...         return [item]
>>> flow.flat_map(remove_none_and_key)
>>> flow.output("out", StdOutput())
>>> run_main(flow)
a
b

Args

step_id : str
Uniquely identifies this step for recovery.
builder
builder(key: Any) => new_state: Any
mapper
mapper(state: Any, value: Any) => (updated_state: Any, updated_value: Any)