Module bytewax.testing
Helper tools for testing dataflows.
Expand source code
"""Helper tools for testing dataflows."""
from datetime import datetime, timedelta, timezone
from itertools import islice
from typing import Any, Iterable, Iterator
from bytewax.inputs import (
PartitionedInput,
StatefulSource,
batch,
)
from bytewax.outputs import DynamicOutput, StatelessSink
from .bytewax import cluster_main, run_main
__all__ = [
"run_main",
"cluster_main",
"ffwd_iter",
"poll_next_batch",
"TestingInput",
"TestingOutput",
]
def ffwd_iter(it: Iterator[Any], n: int) -> None:
"""Skip an iterator forward some number of items.
Args:
it:
A stateful iterator to advance.
n:
Number of items to skip from the current position.
"""
# Taken from `consume`
# https://docs.python.org/3/library/itertools.html#itertools-recipes
# Apparently faster than a for loop.
next(islice(it, n, n), None)
class _IterSource(StatefulSource):
def __init__(self, ib, batch_size, resume_state):
self._start_idx = 0 if resume_state is None else resume_state
it = iter(ib)
# Resume to one after the last completed read index.
ffwd_iter(it, self._start_idx)
self._batcher = batch(it, batch_size)
def next_batch(self):
batch = next(self._batcher)
self._start_idx += len(batch)
return batch
def snapshot(self):
return self._start_idx
class TestingInput(PartitionedInput):
"""Produce input from a Python iterable.
You only want to use this for unit testing.
The iterable must be identical on all workers.
There is no parallelism; only one worker will actually consume the
iterable.
Be careful using a generator as the iterable; if you fail and
attempt to resume the dataflow without rebuilding it, the
half-consumed generator will be re-used on recovery and early
input will be lost so resume will see the correct data.
"""
__test__ = False
def __init__(self, ib: Iterable[Any], batch_size: int = 1):
"""Init.
Args:
ib:
Iterable for input.
batch_size:
Number of items from the iterable to emit in each
batch. Defaults to 1.
"""
self._ib = ib
self._batch_size = batch_size
def list_parts(self):
"""The iterable is read on a single worker."""
return ["iterable"]
def build_part(self, for_key, resume_state):
"""See ABC docstring."""
assert for_key == "iterable"
return _IterSource(self._ib, self._batch_size, resume_state)
class _ListSink(StatelessSink):
def __init__(self, ls):
self._ls = ls
def write_batch(self, items):
self._ls += items
class TestingOutput(DynamicOutput):
"""Append each output item to a list.
You only want to use this for unit testing.
Can support at-least-once processing. The list is not cleared
between executions.
"""
__test__ = False
def __init__(self, ls):
"""Init.
Args:
ls: List to append to.
"""
self._ls = ls
def build(self, worker_index, worker_count):
"""See ABC docstring."""
return _ListSink(self._ls)
def poll_next_batch(source: StatefulSource, timeout=timedelta(seconds=5)):
"""Repeatedly poll an input source until it returns a batch.
You'll want to use this in unit tests of sources when there's some
non-determinism in how items are read.
This is a busy-loop.
Args:
source: To call `StatefulSource.next` on.
timeout: How long to continuously poll for.
Returns:
The next batch found.
Raises:
TimeoutError: If no batch was returned within the timeout.
"""
batch = []
start = datetime.now(timezone.utc)
while len(batch) <= 0:
if datetime.now(timezone.utc) - start > timeout:
raise TimeoutError()
batch = source.next_batch()
return batch
Functions
def cluster_main(flow, addresses, proc_id, *, epoch_interval=None, recovery_config=None, worker_count_per_proc=1)
-
Execute a dataflow in the current process as part of a cluster.
This is only used for unit testing. See
bytewax.run
.Blocks until execution is complete.
>>> from bytewax.dataflow import Dataflow >>> from bytewax.testing import TestingInput >>> from bytewax.connectors.stdio import StdOutput >>> flow = Dataflow() >>> flow.input("inp", TestingInput(range(3))) >>> flow.capture(StdOutput()) >>> addresses = [] # In a real example, you'd find the "host:port" of all other Bytewax workers. >>> proc_id = 0 # In a real example, you'd assign each worker a distinct ID from 0..proc_count. >>> cluster_main(flow, addresses, proc_id) 0 1 2
Args
flow
- Dataflow to run.
addresses
- List of host/port addresses for all processes in this cluster (including this one).
proc_id
- Index of this process in cluster; starts from 0.
epoch_interval
:datetime.timedelta
- System time length of each epoch. Defaults to 10 seconds.
recovery_config
:RecoveryConfig
- State
recovery config. If
None
, state will not be persisted. worker_count_per_proc
- Number of worker threads to start on each process.
def ffwd_iter(it: Iterator[Any], n: int) ‑> None
-
Skip an iterator forward some number of items.
Args
it: A stateful iterator to advance. n: Number of items to skip from the current position.
Expand source code
def ffwd_iter(it: Iterator[Any], n: int) -> None: """Skip an iterator forward some number of items. Args: it: A stateful iterator to advance. n: Number of items to skip from the current position. """ # Taken from `consume` # https://docs.python.org/3/library/itertools.html#itertools-recipes # Apparently faster than a for loop. next(islice(it, n, n), None)
def poll_next_batch(source: StatefulSource, timeout=datetime.timedelta(seconds=5))
-
Repeatedly poll an input source until it returns a batch.
You'll want to use this in unit tests of sources when there's some non-determinism in how items are read.
This is a busy-loop.
Args
source
- To call
StatefulSource.next
on. timeout
- How long to continuously poll for.
Returns
The next batch found.
Raises
TimeoutError
- If no batch was returned within the timeout.
Expand source code
def poll_next_batch(source: StatefulSource, timeout=timedelta(seconds=5)): """Repeatedly poll an input source until it returns a batch. You'll want to use this in unit tests of sources when there's some non-determinism in how items are read. This is a busy-loop. Args: source: To call `StatefulSource.next` on. timeout: How long to continuously poll for. Returns: The next batch found. Raises: TimeoutError: If no batch was returned within the timeout. """ batch = [] start = datetime.now(timezone.utc) while len(batch) <= 0: if datetime.now(timezone.utc) - start > timeout: raise TimeoutError() batch = source.next_batch() return batch
def run_main(flow, *, epoch_interval=None, recovery_config=None)
-
Execute a dataflow in the current thread.
Blocks until execution is complete.
This is only used for unit testing. See
bytewax.run
.>>> from bytewax.dataflow import Dataflow >>> from bytewax.testing import TestingInput, run_main >>> from bytewax.connectors.stdio import StdOutput >>> flow = Dataflow() >>> flow.input("inp", TestingInput(range(3))) >>> flow.capture(StdOutput()) >>> run_main(flow) 0 1 2
Args
flow
- Dataflow to run.
epoch_interval
:datetime.timedelta
- System time length of each epoch. Defaults to 10 seconds.
recovery_config
:RecoveryConfig
- State
recovery config. If
None
, state will not be persisted.
Classes
class TestingInput (ib: Iterable[Any], batch_size: int = 1)
-
Produce input from a Python iterable.
You only want to use this for unit testing.
The iterable must be identical on all workers.
There is no parallelism; only one worker will actually consume the iterable.
Be careful using a generator as the iterable; if you fail and attempt to resume the dataflow without rebuilding it, the half-consumed generator will be re-used on recovery and early input will be lost so resume will see the correct data.
Init.
Args
ib: Iterable for input. batch_size: Number of items from the iterable to emit in each batch. Defaults to 1.
Expand source code
class TestingInput(PartitionedInput): """Produce input from a Python iterable. You only want to use this for unit testing. The iterable must be identical on all workers. There is no parallelism; only one worker will actually consume the iterable. Be careful using a generator as the iterable; if you fail and attempt to resume the dataflow without rebuilding it, the half-consumed generator will be re-used on recovery and early input will be lost so resume will see the correct data. """ __test__ = False def __init__(self, ib: Iterable[Any], batch_size: int = 1): """Init. Args: ib: Iterable for input. batch_size: Number of items from the iterable to emit in each batch. Defaults to 1. """ self._ib = ib self._batch_size = batch_size def list_parts(self): """The iterable is read on a single worker.""" return ["iterable"] def build_part(self, for_key, resume_state): """See ABC docstring.""" assert for_key == "iterable" return _IterSource(self._ib, self._batch_size, resume_state)
Ancestors
- PartitionedInput
- Input
- abc.ABC
Methods
def build_part(self, for_key, resume_state)
-
See ABC docstring.
Expand source code
def build_part(self, for_key, resume_state): """See ABC docstring.""" assert for_key == "iterable" return _IterSource(self._ib, self._batch_size, resume_state)
def list_parts(self)
-
The iterable is read on a single worker.
Expand source code
def list_parts(self): """The iterable is read on a single worker.""" return ["iterable"]
class TestingOutput (ls)
-
Append each output item to a list.
You only want to use this for unit testing.
Can support at-least-once processing. The list is not cleared between executions.
Init.
Args
ls
- List to append to.
Expand source code
class TestingOutput(DynamicOutput): """Append each output item to a list. You only want to use this for unit testing. Can support at-least-once processing. The list is not cleared between executions. """ __test__ = False def __init__(self, ls): """Init. Args: ls: List to append to. """ self._ls = ls def build(self, worker_index, worker_count): """See ABC docstring.""" return _ListSink(self._ls)
Ancestors
- DynamicOutput
- Output
- abc.ABC
Methods
def build(self, worker_index, worker_count)
-
See ABC docstring.
Expand source code
def build(self, worker_index, worker_count): """See ABC docstring.""" return _ListSink(self._ls)