Module bytewax.testing
Helper tools for testing dataflows.
Expand source code
"""Helper tools for testing dataflows.
"""
import multiprocessing.dummy
from contextlib import contextmanager
from threading import Lock
_print_lock = Lock()
def test_print(*args, **kwargs):
"""A version of `print()` which takes an in-process lock to prevent
multiple worker threads from writing simultaneously which results
in interleaved output.
You'd use this if you're integration testing a dataflow and want
more deterministic output. Remember that even with this, the items
from multi-worker output might be "out-of-order" because each
worker is racing each other. You probably want to sort your output
in some way.
Arguments are passed through to `print()` unmodified.
"""
with _print_lock:
print(*args, flush=True, **kwargs)
doctest_ctx = multiprocessing.dummy
"""Use this `multiprocessing` context when running in doctests.
Pass to `bytewax.spawn_cluster()` and `bytewax.run_cluster()`.
Spawning subprocesses is fraught in doctest contexts, so use this to
demonstrate the API works, but not actually run via multiple
processes. We have other normal `pytest` tests which actually test
behavior. Don't worry.
"""
@contextmanager
def _Manager():
"""`multiprocessing.dummy.Manager()` doesn't support being a context
manager like a real `multiprocessing.Manager()` does... So let's
monkey patch it.
"""
yield doctest_ctx
doctest_ctx.Manager = _Manager
Global variables
var doctest_ctx
-
Use this
multiprocessing
context when running in doctests.Pass to
bytewax.spawn_cluster()
andbytewax.run_cluster()
.Spawning subprocesses is fraught in doctest contexts, so use this to demonstrate the API works, but not actually run via multiple processes. We have other normal
pytest
tests which actually test behavior. Don't worry.Expand source code
# # Support for the API of the multiprocessing package using threads # # multiprocessing/dummy/__init__.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # __all__ = [ 'Process', 'current_process', 'active_children', 'freeze_support', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' ] # # Imports # import threading import sys import weakref import array from .connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore from threading import Event, Condition, Barrier from queue import Queue # # # class DummyProcess(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): threading.Thread.__init__(self, group, target, name, args, kwargs) self._pid = None self._children = weakref.WeakKeyDictionary() self._start_called = False self._parent = current_process() def start(self): if self._parent is not current_process(): raise RuntimeError( "Parent is {0!r} but current_process is {1!r}".format( self._parent, current_process())) self._start_called = True if hasattr(self._parent, '_children'): self._parent._children[self] = None threading.Thread.start(self) @property def exitcode(self): if self._start_called and not self.is_alive(): return 0 else: return None # # # Process = DummyProcess current_process = threading.current_thread current_process()._children = weakref.WeakKeyDictionary() def active_children(): children = current_process()._children for p in list(children): if not p.is_alive(): children.pop(p, None) return list(children) def freeze_support(): pass # # # class Namespace(object): def __init__(self, /, **kwds): self.__dict__.update(kwds) def __repr__(self): items = list(self.__dict__.items()) temp = [] for name, value in items: if not name.startswith('_'): temp.append('%s=%r' % (name, value)) temp.sort() return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) dict = dict list = list def Array(typecode, sequence, lock=True): return array.array(typecode, sequence) class Value(object): def __init__(self, typecode, value, lock=True): self._typecode = typecode self._value = value @property def value(self): return self._value @value.setter def value(self, value): self._value = value def __repr__(self): return '<%s(%r, %r)>'%(type(self).__name__,self._typecode,self._value) def Manager(): return sys.modules[__name__] def shutdown(): pass def Pool(processes=None, initializer=None, initargs=()): from ..pool import ThreadPool return ThreadPool(processes, initializer, initargs) JoinableQueue = Queue
Functions
def test_print(*args, **kwargs)
-
A version of
print()
which takes an in-process lock to prevent multiple worker threads from writing simultaneously which results in interleaved output.You'd use this if you're integration testing a dataflow and want more deterministic output. Remember that even with this, the items from multi-worker output might be "out-of-order" because each worker is racing each other. You probably want to sort your output in some way.
Arguments are passed through to
print()
unmodified.Expand source code
def test_print(*args, **kwargs): """A version of `print()` which takes an in-process lock to prevent multiple worker threads from writing simultaneously which results in interleaved output. You'd use this if you're integration testing a dataflow and want more deterministic output. Remember that even with this, the items from multi-worker output might be "out-of-order" because each worker is racing each other. You probably want to sort your output in some way. Arguments are passed through to `print()` unmodified. """ with _print_lock: print(*args, flush=True, **kwargs)