Module bytewax.window
Time-based windows.
Bytewax provides some operators and pre-built configurations for easily grouping data into buckets called windows and running code on just the values in those windows.
See the operator methods on Dataflow
with _window
in the name for simple example use cases of each.
Use
-
Pick a clock and create a config for it. A clock determines the time of each element and the current time used for closing each window. E.g. use the current system time. See the docs for each subclass of
ClockConfig
for options. -
Pick a windower and create a config for it. A windower defines how to take the values and their times and bucket them into windows. E.g. have tumbling windows every 30 seconds. See the docs for each subclass of
WindowConfig
for options. -
Pick a key to route the values for the window and make sure the input to the windowing operator you choose is a 2-tuple of
(key: str, value)
. Windows are managed independently for each key. If you need all data to be processed into the same window state, you can use a constant key like("ALL", value)
but this will reduce the parallelism possible in the dataflow. This is similar to all the other stateful operators, so you can read more on their methods onDataflow
. -
Pass both these configs to the windowing operator of your choice. The windowing operators decide what kind of logic you should apply to values within a window and what should be the output of the window. E.g.
Dataflow.reduce_window()
combines all values in a window into a single output and sends that downstream.
You are allowed and encouraged to have as many different clocks and windowers as you need in a single dataflow. Just instantiate more of them and pass the ones you need for each situation to each windowing operator.
Order
Because Bytewax can be run as a distributed system with multiple worker processes and threads all reading relevant data simultaneously, you have to specifically collect and manually sort data that you need to process in strict time order.
Recovery
Bytewax's windowing system is built on top of its recovery system (see
bytewax.run
for more info), so failure in the middle of a window
will be handled as gracefully as possible.
Some clocks don't have a single correct answer on what to do during
recovery. E.g. if you use SystemClockConfig
with 10 minute windows,
but then recover on a 15 minute mark, the system will immediately
close out the half-completed window stored during recovery. See the
docs for each ClockConfig
subclass for specific notes on recovery.
Recovery happens on the granularity of the epochs of the dataflow,
not the windows. Epoch interval has no affect on windowing operator
behavior when there are no failures; it is solely an implementation
detail of the recovery system. See bytewax.run
for more information
on epochs.
Expand source code
"""Time-based windows.
Bytewax provides some operators and pre-built configurations for
easily grouping data into buckets called **windows** and running code
on just the values in those windows.
See the operator methods on `bytewax.dataflow.Dataflow` with `_window`
in the name for simple example use cases of each.
Use
---
1. Pick a clock and create a config for it. A **clock** determines the
time of each element and the current time used for closing each
window. E.g. use the current system time. See the docs for each
subclass of `ClockConfig` for options.
2. Pick a windower and create a config for it. A **windower** defines
how to take the values and their times and bucket them into
windows. E.g. have tumbling windows every 30 seconds. See the docs for
each subclass of `WindowConfig` for options.
3. Pick a **key** to route the values for the window and make sure the
input to the windowing operator you choose is a 2-tuple of `(key: str,
value)`. Windows are managed independently for each key. If you need
all data to be processed into the same window state, you can use a
constant key like `("ALL", value)` but this will reduce the
parallelism possible in the dataflow. This is similar to all the other
stateful operators, so you can read more on their methods on
`bytewax.dataflow.Dataflow`.
4. Pass both these configs to the windowing operator of your
choice. The **windowing operators** decide what kind of logic you
should apply to values within a window and what should be the output
of the window. E.g. `bytewax.dataflow.Dataflow.reduce_window` combines
all values in a window into a single output and sends that downstream.
You are allowed and encouraged to have as many different clocks and
windowers as you need in a single dataflow. Just instantiate more of
them and pass the ones you need for each situation to each windowing
operator.
Order
-----
Because Bytewax can be run as a distributed system with multiple
worker processes and threads all reading relevant data simultaneously,
you have to specifically collect and manually sort data that you need
to process in strict time order.
Recovery
--------
Bytewax's windowing system is built on top of its recovery system (see
`bytewax.run` for more info), so failure in the middle of a window
will be handled as gracefully as possible.
Some clocks don't have a single correct answer on what to do during
recovery. E.g. if you use `SystemClockConfig` with 10 minute windows,
but then recover on a 15 minute mark, the system will immediately
close out the half-completed window stored during recovery. See the
docs for each `ClockConfig` subclass for specific notes on recovery.
Recovery happens on the granularity of the _epochs_ of the dataflow,
not the windows. Epoch interval has no affect on windowing operator
behavior when there are no failures; it is solely an implementation
detail of the recovery system. See `bytewax.run` for more information
on epochs.
"""
from .bytewax import ( # noqa: F401
ClockConfig,
EventClockConfig,
SessionWindow,
SlidingWindow,
SystemClockConfig,
TumblingWindow,
WindowConfig,
)
__all__ = [
"ClockConfig",
"EventClockConfig",
"SessionWindow",
"SlidingWindow",
"SystemClockConfig",
"TumblingWindow",
"WindowConfig",
]
Classes
class ClockConfig
-
Base class for a clock config.
This describes how a windowing operator should determine the current time and the time for each element.
Use a specific subclass of this that matches the time definition you'd like to use.
Subclasses
class EventClockConfig (dt_getter, wait_for_system_duration)
-
Use a getter function to lookup the timestamp for each item.
The watermark is the largest item timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means items will be correctly processed as long as they are not out of order more than the waiting duration in system time.
If the dataflow has no more input, all windows are closed.
Args
dt_getter: Python function to get a datetime from an event. The datetime returned must have tzinfo set to
timezone.utc
. E.g.datetime(1970, 1, 1, tzinfo=timezone.utc)
wait_for_system_duration: How much system time to wait before considering an event late.Returns
Config object. Pass this as the
clock_config
parameter to your windowing operator.Ancestors
Instance variables
var dt_getter
-
Return an attribute of instance, which is of type owner.
var wait_for_system_duration
-
Return an attribute of instance, which is of type owner.
class SessionWindow (gap)
-
Session windowing with a fixed inactivity gap. Each time a new item is received, it is added to the latest window if the time since the latest event is < gap. Otherwise a new window is created that starts at current clock's time.
Args: gap (datetime.timedelta): Gap of inactivity before considering a session closed. The gap should not be negative.
Returns
Config object. Pass this as the
window_config
parameter to your windowing operator.Ancestors
Instance variables
var gap
-
Return an attribute of instance, which is of type owner.
class SlidingWindow (length, offset, align_to)
-
Sliding windows of fixed duration.
If offset == length, windows cover all time but do not overlap. Each item will fall in exactly one window. The
TumblingWindow
config will do this for you.If offset < length, windows overlap. Each item will fall in multiple windows.
If offset > length, there will be gaps between windows. Each item can fall in up to one window, but might fall into none.
Window start times are inclusive, but end times are exclusive.
Args
length (datetime.timedelta): Length of windows. offset (datetime.timedelta): Duration between start times of adjacent windows. align_to (datetime.datetime): Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.
Returns
Config object. Pass this as the
window_config
parameter to your windowing operator.Ancestors
Instance variables
var align_to
-
Return an attribute of instance, which is of type owner.
var length
-
Return an attribute of instance, which is of type owner.
var offset
-
Return an attribute of instance, which is of type owner.
class SystemClockConfig
-
Use the current system time as the timestamp for each item.
The watermark is also the current system time.
If the dataflow has no more input, all windows are closed.
Returns
Config object. Pass this as the
clock_config
parameter to your windowing operator.Ancestors
class TumblingWindow (length, align_to)
-
Tumbling windows of fixed duration.
Each item will fall in exactly one window.
Window start times are inclusive, but end times are exclusive.
Args
length (datetime.timedelta): Length of windows. align_to (datetime.datetime): Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.
Returns
Config object. Pass this as the
window_config
parameter to your windowing operator.Ancestors
Instance variables
var align_to
-
Return an attribute of instance, which is of type owner.
var length
-
Return an attribute of instance, which is of type owner.
class WindowConfig
-
Base class for a windower config.
This describes the type of windows you would like.
Use a specific subclass of this that matches the window definition you'd like to use.
Subclasses