Module bytewax.testing

Helper tools for testing dataflows.

Expand source code
"""Helper tools for testing dataflows."""
from datetime import datetime, timedelta, timezone
from itertools import islice
from typing import Any, Iterable, Iterator

from bytewax.inputs import (
    PartitionedInput,
    StatefulSource,
    batch,
)
from bytewax.outputs import DynamicOutput, StatelessSink

from .bytewax import cluster_main, run_main

__all__ = [
    "run_main",
    "cluster_main",
    "ffwd_iter",
    "poll_next_batch",
    "TestingInput",
    "TestingOutput",
]


def ffwd_iter(it: Iterator[Any], n: int) -> None:
    """Skip an iterator forward some number of items.

    Args:
        it:
            A stateful iterator to advance.
        n:
            Number of items to skip from the current position.

    """
    # Taken from `consume`
    # https://docs.python.org/3/library/itertools.html#itertools-recipes
    # Apparently faster than a for loop.
    next(islice(it, n, n), None)


class _IterSource(StatefulSource):
    def __init__(self, ib, batch_size, resume_state):
        self._start_idx = 0 if resume_state is None else resume_state
        it = iter(ib)
        # Resume to one after the last completed read index.
        ffwd_iter(it, self._start_idx)
        self._batcher = batch(it, batch_size)

    def next_batch(self):
        batch = next(self._batcher)
        self._start_idx += len(batch)
        return batch

    def snapshot(self):
        return self._start_idx


class TestingInput(PartitionedInput):
    """Produce input from a Python iterable.

    You only want to use this for unit testing.

    The iterable must be identical on all workers.

    There is no parallelism; only one worker will actually consume the
    iterable.

    Be careful using a generator as the iterable; if you fail and
    attempt to resume the dataflow without rebuilding it, the
    half-consumed generator will be re-used on recovery and early
    input will be lost so resume will see the correct data.

    """

    __test__ = False

    def __init__(self, ib: Iterable[Any], batch_size: int = 1):
        """Init.

        Args:
            ib:
                Iterable for input.
            batch_size:
                Number of items from the iterable to emit in each
                batch. Defaults to 1.

        """
        self._ib = ib
        self._batch_size = batch_size

    def list_parts(self):
        """The iterable is read on a single worker."""
        return ["iterable"]

    def build_part(self, for_key, resume_state):
        """See ABC docstring."""
        assert for_key == "iterable"
        return _IterSource(self._ib, self._batch_size, resume_state)


class _ListSink(StatelessSink):
    def __init__(self, ls):
        self._ls = ls

    def write_batch(self, items):
        self._ls += items


class TestingOutput(DynamicOutput):
    """Append each output item to a list.

    You only want to use this for unit testing.

    Can support at-least-once processing. The list is not cleared
    between executions.

    """

    __test__ = False

    def __init__(self, ls):
        """Init.

        Args:
            ls: List to append to.
        """
        self._ls = ls

    def build(self, worker_index, worker_count):
        """See ABC docstring."""
        return _ListSink(self._ls)


def poll_next_batch(source: StatefulSource, timeout=timedelta(seconds=5)):
    """Repeatedly poll an input source until it returns a batch.

    You'll want to use this in unit tests of sources when there's some
    non-determinism in how items are read.

    This is a busy-loop.

    Args:
        source: To call `StatefulSource.next` on.

        timeout: How long to continuously poll for.

    Returns:
        The next batch found.

    Raises:
        TimeoutError: If no batch was returned within the timeout.

    """
    batch = []
    start = datetime.now(timezone.utc)
    while len(batch) <= 0:
        if datetime.now(timezone.utc) - start > timeout:
            raise TimeoutError()
        batch = source.next_batch()
    return batch

Functions

def cluster_main(flow, addresses, proc_id, *, epoch_interval=None, recovery_config=None, worker_count_per_proc=1)

Execute a dataflow in the current process as part of a cluster.

This is only used for unit testing. See bytewax.run.

Blocks until execution is complete.

>>> from bytewax.dataflow import Dataflow
>>> from bytewax.testing import TestingInput
>>> from bytewax.connectors.stdio import StdOutput
>>> flow = Dataflow()
>>> flow.input("inp", TestingInput(range(3)))
>>> flow.capture(StdOutput())
>>> addresses = []  # In a real example, you'd find the "host:port" of all other Bytewax workers.
>>> proc_id = 0  # In a real example, you'd assign each worker a distinct ID from 0..proc_count.
>>> cluster_main(flow, addresses, proc_id)
0
1
2

Args

flow
Dataflow to run.
addresses
List of host/port addresses for all processes in this cluster (including this one).
proc_id
Index of this process in cluster; starts from 0.
epoch_interval : datetime.timedelta
System time length of each epoch. Defaults to 10 seconds.
recovery_config : RecoveryConfig
State recovery config. If None, state will not be persisted.
worker_count_per_proc
Number of worker threads to start on each process.
def ffwd_iter(it: Iterator[Any], n: int) ‑> None

Skip an iterator forward some number of items.

Args

it: A stateful iterator to advance. n: Number of items to skip from the current position.

Expand source code
def ffwd_iter(it: Iterator[Any], n: int) -> None:
    """Skip an iterator forward some number of items.

    Args:
        it:
            A stateful iterator to advance.
        n:
            Number of items to skip from the current position.

    """
    # Taken from `consume`
    # https://docs.python.org/3/library/itertools.html#itertools-recipes
    # Apparently faster than a for loop.
    next(islice(it, n, n), None)
def poll_next_batch(source: StatefulSource, timeout=datetime.timedelta(seconds=5))

Repeatedly poll an input source until it returns a batch.

You'll want to use this in unit tests of sources when there's some non-determinism in how items are read.

This is a busy-loop.

Args

source
To call StatefulSource.next on.
timeout
How long to continuously poll for.

Returns

The next batch found.

Raises

TimeoutError
If no batch was returned within the timeout.
Expand source code
def poll_next_batch(source: StatefulSource, timeout=timedelta(seconds=5)):
    """Repeatedly poll an input source until it returns a batch.

    You'll want to use this in unit tests of sources when there's some
    non-determinism in how items are read.

    This is a busy-loop.

    Args:
        source: To call `StatefulSource.next` on.

        timeout: How long to continuously poll for.

    Returns:
        The next batch found.

    Raises:
        TimeoutError: If no batch was returned within the timeout.

    """
    batch = []
    start = datetime.now(timezone.utc)
    while len(batch) <= 0:
        if datetime.now(timezone.utc) - start > timeout:
            raise TimeoutError()
        batch = source.next_batch()
    return batch
def run_main(flow, *, epoch_interval=None, recovery_config=None)

Execute a dataflow in the current thread.

Blocks until execution is complete.

This is only used for unit testing. See bytewax.run.

>>> from bytewax.dataflow import Dataflow
>>> from bytewax.testing import TestingInput, run_main
>>> from bytewax.connectors.stdio import StdOutput
>>> flow = Dataflow()
>>> flow.input("inp", TestingInput(range(3)))
>>> flow.capture(StdOutput())
>>> run_main(flow)
0
1
2

Args

flow
Dataflow to run.
epoch_interval : datetime.timedelta
System time length of each epoch. Defaults to 10 seconds.
recovery_config : RecoveryConfig
State recovery config. If None, state will not be persisted.

Classes

class TestingInput (ib: Iterable[Any], batch_size: int = 1)

Produce input from a Python iterable.

You only want to use this for unit testing.

The iterable must be identical on all workers.

There is no parallelism; only one worker will actually consume the iterable.

Be careful using a generator as the iterable; if you fail and attempt to resume the dataflow without rebuilding it, the half-consumed generator will be re-used on recovery and early input will be lost so resume will see the correct data.

Init.

Args

ib: Iterable for input. batch_size: Number of items from the iterable to emit in each batch. Defaults to 1.

Expand source code
class TestingInput(PartitionedInput):
    """Produce input from a Python iterable.

    You only want to use this for unit testing.

    The iterable must be identical on all workers.

    There is no parallelism; only one worker will actually consume the
    iterable.

    Be careful using a generator as the iterable; if you fail and
    attempt to resume the dataflow without rebuilding it, the
    half-consumed generator will be re-used on recovery and early
    input will be lost so resume will see the correct data.

    """

    __test__ = False

    def __init__(self, ib: Iterable[Any], batch_size: int = 1):
        """Init.

        Args:
            ib:
                Iterable for input.
            batch_size:
                Number of items from the iterable to emit in each
                batch. Defaults to 1.

        """
        self._ib = ib
        self._batch_size = batch_size

    def list_parts(self):
        """The iterable is read on a single worker."""
        return ["iterable"]

    def build_part(self, for_key, resume_state):
        """See ABC docstring."""
        assert for_key == "iterable"
        return _IterSource(self._ib, self._batch_size, resume_state)

Ancestors

Methods

def build_part(self, for_key, resume_state)

See ABC docstring.

Expand source code
def build_part(self, for_key, resume_state):
    """See ABC docstring."""
    assert for_key == "iterable"
    return _IterSource(self._ib, self._batch_size, resume_state)
def list_parts(self)

The iterable is read on a single worker.

Expand source code
def list_parts(self):
    """The iterable is read on a single worker."""
    return ["iterable"]
class TestingOutput (ls)

Append each output item to a list.

You only want to use this for unit testing.

Can support at-least-once processing. The list is not cleared between executions.

Init.

Args

ls
List to append to.
Expand source code
class TestingOutput(DynamicOutput):
    """Append each output item to a list.

    You only want to use this for unit testing.

    Can support at-least-once processing. The list is not cleared
    between executions.

    """

    __test__ = False

    def __init__(self, ls):
        """Init.

        Args:
            ls: List to append to.
        """
        self._ls = ls

    def build(self, worker_index, worker_count):
        """See ABC docstring."""
        return _ListSink(self._ls)

Ancestors

Methods

def build(self, worker_index, worker_count)

See ABC docstring.

Expand source code
def build(self, worker_index, worker_count):
    """See ABC docstring."""
    return _ListSink(self._ls)