Module bytewax.dataflow
How to define dataflows.
Create a Dataflow
instance, then use the methods on it to add
computational steps.
Expand source code
"""How to define dataflows.
Create a `Dataflow` instance, then use the methods on it to add
computational steps.
"""
from .bytewax import Dataflow # noqa: F401
Classes
class Dataflow
-
A definition of a Bytewax dataflow graph.
Use the methods defined on this class to add steps with operators of the same name.
See the execution entry points in
bytewax.execution
to run.Instance variables
var steps
-
Return an attribute of instance, which is of type owner.
Methods
def capture(self, output_config)
-
Capture is how you specify output of a dataflow.
At least one capture is required on every dataflow.
It emits items downstream unmodified; you can capture midway through a dataflow.
See
bytewax.outputs
for more information on how output works.>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) 0 1 2
Args
output_config
:OutputConfig
- Output
config to use. See
bytewax.outputs
.
def collect_window(self, step_id, clock_config, window_config)
-
Collect window lets emits all items for a key in a window downstream in sorted order.
It is a stateful operator. It requires the upstream items are
(key: str, value)
tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.It emits
(key, list)
tuples downstream at the end of each window wherelist
is sorted by the time assigned by the clock.Currently, data is permanently allocated per-key. If you have an ever-growing key space, note this.
Args
step_id
:str
- Uniquely identifies this step for recovery.
clock_config
:ClockConfig
- Clock config to
use. See
bytewax.window
. window_config
:WindowConfig
- Windower
config to use. See
bytewax.window
.
def filter(self, predicate)
-
Filter selectively keeps only some items.
It calls a predicate function on each item.
It emits the item downstream unmodified if the predicate returns
True
.It is commonly used for:
- Selecting relevant events
- Removing empty events
- Removing sentinels
- Removing stop words
>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> def is_odd(item): ... return item % 2 != 0 >>> flow.filter(is_odd) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) 1 3
Args
predicate
predicate(item: Any) => should_emit: bool
def flat_map(self, mapper)
-
Flat map is a one-to-many transformation of items.
It calls a mapper function on each item.
It emits each element in the returned iterator individually downstream in the epoch of the input item.
It is commonly used for:
- Tokenizing
- Flattening hierarchical objects
- Breaking up aggregations for further processing
>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> inp = ["hello world"] >>> flow.input("inp", TestingInputConfig(inp)) >>> def split_into_words(sentence): ... return sentence.split() >>> flow.flat_map(split_into_words) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) hello world
Args
mapper
mapper(item: Any) => emit: Iterable[Any]
def fold_window(self, step_id, clock_config, window_config, builder, folder)
-
Fold window lets you combine all items for a key within a window into an accumulator, using a function to build its initial value.
It is like
bytewax.Dataflow.reduce_window()
but uses a function to build the initial value.It is a stateful operator. It requires the input stream has items that are
(key: str, value)
tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.It calls two functions:
-
A builder function which is called the first time a key appears and is expected to return the empty state for that key.
-
A folder which combines a new value with an accumulator. The accumulator is initially the output of the builder function. Values will be passed in window order, but no order is defined within a window.
It emits
(key, accumulator)
tuples downstream when the window closes>>> def gen(): ... yield from [ ... {"user": "a", "type": "login"}, ... sleep(4) ... {"user": "a", "type": "post"}, ... sleep(4) ... {"user": "a", "type": "post"}, ... sleep(4) ... {"user": "b", "type": "login"}, ... sleep(4) ... {"user": "a", "type": "post"}, ... sleep(4) ... {"user": "b", "type": "post"}, ... sleep(4) ... {"user": "b", "type": "post"}, ... ] >>> def extract_id(event): ... return (event["user"], event) >>> def build(): ... return defaultdict(lambda: 0) >>> def count(results, event): ... results[event["type"]] += 1 ... return results >>> clock_config = SystemClockConfig() >>> window_config = TumblingWindowConfig(length=timedelta(seconds=10)) >>> out = [] >>> flow = Dataflow(TestingInputConfig(gen())) >>> flow.map(extract_id) >>> flow.fold_window("sum", clock_config, window_config, build, count) >>> flow.capture(TestingOutputConfig(out)) >>> run_main(flow) >>> assert len(out) == 3 >>> assert ("a", {"login": 1, "post": 2}) in out >>> assert ("a", {"post": 1}) in out >>> assert ("b", {"login": 1, "post": 2}) in out
Args
step_id
- Uniquely identifies this step for recovery.
clock_config
- Clock config to use. See
bytewax.window
. window_config
- Windower config to use. See
bytewax.window
. builder
builder(key: Any) => initial_accumulator: Any
folder
folder(accumulator: Any, value: Any) => updated_accumulator: Any
-
def input(self, step_id, input_config)
-
Input introduces data into the dataflow.
At least one input is required on every dataflow.
Emits items downstream from the input source.
See
bytewax.inputs
for more information on how input works.>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) 0 1 2
Args
step_id
:str
- Uniquely identifies this step for recovery.
input_config
:bytewax.input.InputConfig
- Input config to
use. See
bytewax.inputs
.
def inspect(self, inspector)
-
Inspect allows you to observe, but not modify, items.
It calls an inspector callback on each item.
It emits items downstream unmodified.
It is commonly used for debugging.
>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import TestingOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> def log(item): ... print("Saw", item) >>> flow.inspect(log) >>> out = [] >>> flow.capture(TestingOutputConfig(out)) # Notice we don't print out. >>> run_main(flow) Saw 1 Saw 2 Saw 3
Args
inspector
inspector(item: Any) => None
def inspect_epoch(self, inspector)
-
Inspect epoch allows you to observe, but not modify, items and their epochs.
It calls an inspector function on each item with its epoch.
It emits items downstream unmodified.
It is commonly used for debugging.
>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import TestingOutputConfig >>> from bytewax.execution import run_main, TestingEpochConfig >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> def log(epoch, item): ... print(f"Saw {item} @ {epoch}") >>> flow.inspect(log) >>> out = [] >>> flow.capture(TestingOutputConfig(out)) # Notice we don't print out. >>> run_main(flow, epoch_config=TestingEpochConfig()) Saw 0 @ 0 Saw 1 @ 1 Saw 2 @ 2
Args
inspector
inspector(epoch: int, item: Any) => None
def map(self, mapper)
-
Map is a one-to-one transformation of items.
It calls a mapper function on each item.
It emits each updated item downstream.
It is commonly used for:
- Extracting keys
- Turning JSON into objects
- So many things
>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> def add_one(item): ... return item + 10 >>> flow.map(add_one) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) 10 11 12
Args
mapper
mapper(item: Any) => updated_item: Any
def reduce(self, step_id, reducer, is_complete)
-
Reduce lets you combine items for a key into an accumulator.
It is a stateful operator. It requires the input stream has items that are
(key: str, value)
tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.It calls two functions:
-
A reducer which combines a new value with an accumulator. The accumulator is initially the first value seen for a key. Values will be passed in an arbitrary order. If there is only a single value for a key since the last completion, this function will not be called.
-
An is complete function which returns
True
if the most recent(key, accumulator)
should be emitted downstream and the accumulator for that key forgotten. If there was only a single value for a key, it is passed in as the accumulator here.
It emits
(key, accumulator)
tuples downstream when the is complete function returnsTrue
in the epoch of the most recent value for that key.If the ordering of values is crucial, group beforhand using a windowing operator with a timeout like
reduce_window
, then sort, then use this operator.It is commonly used for:
- Collection into a list
- Summarizing data
>>> from bytewax.dataflow import Dataflow >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> inp = [ ... {"user": "a", "type": "login"}, ... {"user": "a", "type": "post"}, ... {"user": "b", "type": "login"}, ... {"user": "b", "type": "logout"}, ... {"user": "a", "type": "logout"}, ... ] >>> flow.input("inp", TestingInputConfig(inp)) >>> def user_as_key(event): ... return event["user"], [event] >>> flow.map(user_as_key) >>> def extend_session(session, events): ... session.extend(events) ... return session >>> def session_complete(session): ... return any(event["type"] == "logout" for event in session) >>> flow.reduce("sessionizer", extend_session, session_complete) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) ('b', ['login', 'logout']) ('a', ['login', 'post', 'logout'])
Args
step_id
:str
- Uniquely identifies this step for recovery.
reducer
reducer(accumulator: Any, value: Any) => updated_accumulator: Any
is_complete
is_complete(updated_accumulator: Any) => should_emit: bool
-
def reduce_window(self, step_id, clock_config, window_config, reducer)
-
Reduce window lets you combine all items for a key within a window into an accumulator.
It is like
bytewax.Dataflow.reduce()
but marks the accumulator as complete automatically at the end of each window.It is a stateful operator. It requires the input stream has items that are
(key: str, value)
tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.It calls a reducer function which combines two values. The accumulator is initially the first value seen for a key. Values will be passed in arbitrary order. If there is only a single value for a key in this window, this function will not be called.
It emits
(key, accumulator)
tuples downstream at the end of each window.If the ordering of values is crucial, group in this operator, then sort afterwards.
Currently, data is permanently allocated per-key. If you have an ever-growing key space, note this.
It is commonly used for:
- Sessionization
>>> from datetime import datetime, timedelta >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> from bytewax.window import TestingClockConfig, TumblingWindowConfig >>> flow = Dataflow() >>> inp = [ ... {"user": "b", "type": "login"}, # 1:00pm ... {"user": "a", "type": "login"}, # 1:01pm ... {"user": "a", "type": "post"}, # 1:02pm ... {"user": "b", "type": "post"}, # 1:03pm ... ] >>> flow.input("inp", TestingInputConfig(inp)) >>> def add_initial_count(event): ... return event["user"], 1 >>> flow.map(add_initial_count) >>> def count(count, event_count): ... return count + event_count >>> clock_config = TestingClockConfig( ... item_incr=timedelta(minutes=1), ... start=datetime(2022, 1, 1, 13), ... ) >>> window_config = TumblingWindowConfig( ... length=timedelta(minutes=2), ... ) >>> flow.reduce_window("count", clock_config, window_config, count) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) ('b', 1) ('a', 2) ('b', 1)
Args
step_id
:str
- Uniquely identifies this step for recovery.
clock_config
:ClockConfig
- Clock config to
use. See
bytewax.window
. window_config
:WindowConfig
- Windower
config to use. See
bytewax.window
. reducer
reducer(accumulator: Any, value: Any) => updated_accumulator: Any
def stateful_map(self, step_id, builder, mapper)
-
Stateful map is a one-to-one transformation of values, but allows you to reference a persistent state for each key when doing the transformation.
It is a stateful operator. It requires the input stream has items that are
(key: str, value)
tuples so we can ensure that all relevant values are routed to the relevant state. It also requires a step ID to recover the correct state.It calls two functions:
-
A builder which returns a new state and will be called whenever a new key is encountered with the key as a parameter.
-
A mapper which transforms values. Values will be passed in an arbitrary order. If the updated state is
None
, the state will be forgotten.
It emits a
(key, updated_value)
tuple downstream for each input item.If the ordering of values is crucial, group beforhand using a windowing operator with a timeout like
reduce_window
, then sort, then use this operator.It is commonly used for:
- Anomaly detection
- State machines
>>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> from bytewax.execution import run_main >>> flow = Dataflow() >>> inp = [ ... "a", ... "a", ... "a", ... "a", ... "b", ... ] >>> flow.inputs("inp", TestingInputConfig(inp)) >>> def self_as_key(item): ... return item, item >>> flow.map(self_as_key) >>> def build_count(key): ... return 0 >>> def check(running_count, item): ... running_count += 1 ... if running_count == 1: ... return running_count, item ... else: ... return running_count, None >>> flow.stateful_map("remove_duplicates", build_count, check) >>> def remove_none_and_key(key_item): ... key, item = key_item ... if item is None: ... return [] ... else: ... return [item] >>> flow.flat_map(remove_none_and_key) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) a b
Args
step_id
:str
- Uniquely identifies this step for recovery.
builder
builder(key: Any) => new_state: Any
mapper
mapper(state: Any, value: Any) => (updated_state: Any, updated_value: Any)
-