Module bytewax.outputs

Low-level output interfaces.

If you want pre-built connectors for various external systems, see bytewax.connectors. That is also a rich source of examples.

Subclass the types here to implement input for your own custom sink.

Expand source code
"""Low-level output interfaces.

If you want pre-built connectors for various external systems, see
`bytewax.connectors`. That is also a rich source of examples.

Subclass the types here to implement input for your own custom sink.

"""

from abc import ABC, abstractmethod
from typing import Any, List, Optional
from zlib import adler32

__all__ = [
    "DynamicOutput",
    "Output",
    "PartitionedOutput",
    "StatefulSink",
    "StatelessSink",
]


class Output(ABC):  # noqa: B024
    """Base class for all output types. Do not subclass this.

    If you want to implement a custom connector, instead subclass one
    of the specific output sub-types below in this module.

    """

    def __json__(self):
        """JSON representation of this.

        This is used by the Bytewax platform internally and should
        not be overridden.

        """
        return {
            "type": type(self).__name__,
        }


class StatefulSink(ABC):
    """Output sink that maintains state of its position."""

    @abstractmethod
    def write_batch(self, values: List[Any]) -> None:
        """Write a batch of output values.

        Called with a list of `value`s for each `(key, value)` at this
        point in the dataflow.

        See `PartitionedOutput.part_fn` for how the key is mapped to
        partition.

        Args:
            values:
                Values in the dataflow. Non-deterministically batched.

        """
        ...

    def snapshot(self) -> Any:
        """Snapshot the position of the next write of this sink.

        This will be returned to you via the `resume_state` parameter
        of your output builder.

        Be careful of "off by one" errors in resume state. This should
        return a state that, when built into a sink, resumes writing
        _after the last written item_, not overwriting the same item.

        This is guaranteed to never be called after `close()`.

        Returns:
            Resume state.

        """
        return None

    def close(self) -> None:
        """Cleanup this sink when the dataflow completes.

        This is not guaranteed to be called. It will only be called
        when the dataflow finishes on finite input. It will not be
        called during an abrupt or abort shutdown.

        """
        return


class PartitionedOutput(Output):
    """An output with a fixed number of independent partitions.

    Will maintain the state of each sink and re-build using it during
    resume. If the sink supports seeking and overwriting, this output
    can support exactly-once processing.

    """

    @abstractmethod
    def list_parts(self) -> List[str]:
        """List all local partitions this worker has access to.

        You do not need to list all partitions globally.

        Returns:
            Local partition keys.

        """
        ...

    def part_fn(self, item_key: str) -> int:
        """Route incoming `(key, value)` pairs to partitions.

        Defaults to `zlib.adler32` as a simple consistent function.

        This must be globally consistent across workers and executions
        and return the same hash on every call.

        A specific partition is chosen by wrapped indexing this value
        into the ordered global set of partitions. (Not just
        partitions local to this worker.)

        .. caution:: Do not use Python's built in `hash` function
            here! It is [_not consistent between processes by
            default_](https://docs.python.org/3/using/cmdline.html#cmdoption-R)
            and using it will cause incorrect partitioning in cluster
            executions.

        Args:
            item_key:
                Key for the value that is about to be written.

        Returns:
            Integer hash value that is used to assign partition.

        """
        return adler32(item_key.encode())

    @abstractmethod
    def build_part(
        self,
        for_part: str,
        resume_state: Optional[Any],
    ) -> StatefulSink:
        """Build anew or resume an output partition.

        Will be called once per execution for each partition key on a
        worker that reported that partition was local in `list_parts`.

        Do not pre-build state about a partition in the
        constructor. All state must be derived from `resume_state` for
        recovery to work properly.

        Args:
            for_part:
                Which partition to build. Will always be one of the
                keys returned by `list_parts` on this worker.

            resume_state:
                State data containing where in the output stream this
                partition should be begin writing during this
                execution.

        Returns:
            The built partition.

        """
        ...


class StatelessSink(ABC):
    """Output sink that is stateless."""

    @abstractmethod
    def write_batch(self, items: List[Any]) -> None:
        """Write a batch of output items.

        Called multiple times whenever new items are seen at this
        point in the dataflow.

        Args:
            items:
                Items in the dataflow. Non-deterministically batched.

        """
        ...

    def close(self) -> None:
        """Cleanup this sink when the dataflow completes.

        This is not guaranteed to be called. It will only be called
        when the dataflow finishes on finite input. It will not be
        called during an abrupt or abort shutdown.

        """
        return


class DynamicOutput(Output):
    """An output where all workers write items concurrently.

    Does not support storing any resume state. Thus these kind of
    outputs only naively can support at-least-once processing.

    """

    @abstractmethod
    def build(self, worker_index, worker_count) -> StatelessSink:
        """Build an output sink for a worker.

        Will be called once on each worker.

        Args:
            worker_index:
                Index of this worker.
            worker_count:
                Total number of workers.

        Returns:
            Output sink.

        """
        ...

Classes

class DynamicOutput

An output where all workers write items concurrently.

Does not support storing any resume state. Thus these kind of outputs only naively can support at-least-once processing.

Expand source code
class DynamicOutput(Output):
    """An output where all workers write items concurrently.

    Does not support storing any resume state. Thus these kind of
    outputs only naively can support at-least-once processing.

    """

    @abstractmethod
    def build(self, worker_index, worker_count) -> StatelessSink:
        """Build an output sink for a worker.

        Will be called once on each worker.

        Args:
            worker_index:
                Index of this worker.
            worker_count:
                Total number of workers.

        Returns:
            Output sink.

        """
        ...

Ancestors

Subclasses

Methods

def build(self, worker_index, worker_count) ‑> StatelessSink

Build an output sink for a worker.

Will be called once on each worker.

Args

worker_index: Index of this worker. worker_count: Total number of workers.

Returns

Output sink.

Expand source code
@abstractmethod
def build(self, worker_index, worker_count) -> StatelessSink:
    """Build an output sink for a worker.

    Will be called once on each worker.

    Args:
        worker_index:
            Index of this worker.
        worker_count:
            Total number of workers.

    Returns:
        Output sink.

    """
    ...
class Output

Base class for all output types. Do not subclass this.

If you want to implement a custom connector, instead subclass one of the specific output sub-types below in this module.

Expand source code
class Output(ABC):  # noqa: B024
    """Base class for all output types. Do not subclass this.

    If you want to implement a custom connector, instead subclass one
    of the specific output sub-types below in this module.

    """

    def __json__(self):
        """JSON representation of this.

        This is used by the Bytewax platform internally and should
        not be overridden.

        """
        return {
            "type": type(self).__name__,
        }

Ancestors

  • abc.ABC

Subclasses

class PartitionedOutput

An output with a fixed number of independent partitions.

Will maintain the state of each sink and re-build using it during resume. If the sink supports seeking and overwriting, this output can support exactly-once processing.

Expand source code
class PartitionedOutput(Output):
    """An output with a fixed number of independent partitions.

    Will maintain the state of each sink and re-build using it during
    resume. If the sink supports seeking and overwriting, this output
    can support exactly-once processing.

    """

    @abstractmethod
    def list_parts(self) -> List[str]:
        """List all local partitions this worker has access to.

        You do not need to list all partitions globally.

        Returns:
            Local partition keys.

        """
        ...

    def part_fn(self, item_key: str) -> int:
        """Route incoming `(key, value)` pairs to partitions.

        Defaults to `zlib.adler32` as a simple consistent function.

        This must be globally consistent across workers and executions
        and return the same hash on every call.

        A specific partition is chosen by wrapped indexing this value
        into the ordered global set of partitions. (Not just
        partitions local to this worker.)

        .. caution:: Do not use Python's built in `hash` function
            here! It is [_not consistent between processes by
            default_](https://docs.python.org/3/using/cmdline.html#cmdoption-R)
            and using it will cause incorrect partitioning in cluster
            executions.

        Args:
            item_key:
                Key for the value that is about to be written.

        Returns:
            Integer hash value that is used to assign partition.

        """
        return adler32(item_key.encode())

    @abstractmethod
    def build_part(
        self,
        for_part: str,
        resume_state: Optional[Any],
    ) -> StatefulSink:
        """Build anew or resume an output partition.

        Will be called once per execution for each partition key on a
        worker that reported that partition was local in `list_parts`.

        Do not pre-build state about a partition in the
        constructor. All state must be derived from `resume_state` for
        recovery to work properly.

        Args:
            for_part:
                Which partition to build. Will always be one of the
                keys returned by `list_parts` on this worker.

            resume_state:
                State data containing where in the output stream this
                partition should be begin writing during this
                execution.

        Returns:
            The built partition.

        """
        ...

Ancestors

Subclasses

Methods

def build_part(self, for_part: str, resume_state: Optional[Any]) ‑> StatefulSink

Build anew or resume an output partition.

Will be called once per execution for each partition key on a worker that reported that partition was local in list_parts.

Do not pre-build state about a partition in the constructor. All state must be derived from resume_state for recovery to work properly.

Args

for_part: Which partition to build. Will always be one of the keys returned by list_parts on this worker.

resume_state: State data containing where in the output stream this partition should be begin writing during this execution.

Returns

The built partition.

Expand source code
@abstractmethod
def build_part(
    self,
    for_part: str,
    resume_state: Optional[Any],
) -> StatefulSink:
    """Build anew or resume an output partition.

    Will be called once per execution for each partition key on a
    worker that reported that partition was local in `list_parts`.

    Do not pre-build state about a partition in the
    constructor. All state must be derived from `resume_state` for
    recovery to work properly.

    Args:
        for_part:
            Which partition to build. Will always be one of the
            keys returned by `list_parts` on this worker.

        resume_state:
            State data containing where in the output stream this
            partition should be begin writing during this
            execution.

    Returns:
        The built partition.

    """
    ...
def list_parts(self) ‑> List[str]

List all local partitions this worker has access to.

You do not need to list all partitions globally.

Returns

Local partition keys.

Expand source code
@abstractmethod
def list_parts(self) -> List[str]:
    """List all local partitions this worker has access to.

    You do not need to list all partitions globally.

    Returns:
        Local partition keys.

    """
    ...
def part_fn(self, item_key: str) ‑> int

Route incoming (key, value) pairs to partitions.

Defaults to zlib.adler32 as a simple consistent function.

This must be globally consistent across workers and executions and return the same hash on every call.

A specific partition is chosen by wrapped indexing this value into the ordered global set of partitions. (Not just partitions local to this worker.)

Caution: Do not use Python's built in hash function

here! It is not consistent between processes by default and using it will cause incorrect partitioning in cluster executions.

Args

item_key: Key for the value that is about to be written.

Returns

Integer hash value that is used to assign partition.

Expand source code
def part_fn(self, item_key: str) -> int:
    """Route incoming `(key, value)` pairs to partitions.

    Defaults to `zlib.adler32` as a simple consistent function.

    This must be globally consistent across workers and executions
    and return the same hash on every call.

    A specific partition is chosen by wrapped indexing this value
    into the ordered global set of partitions. (Not just
    partitions local to this worker.)

    .. caution:: Do not use Python's built in `hash` function
        here! It is [_not consistent between processes by
        default_](https://docs.python.org/3/using/cmdline.html#cmdoption-R)
        and using it will cause incorrect partitioning in cluster
        executions.

    Args:
        item_key:
            Key for the value that is about to be written.

    Returns:
        Integer hash value that is used to assign partition.

    """
    return adler32(item_key.encode())
class StatefulSink

Output sink that maintains state of its position.

Expand source code
class StatefulSink(ABC):
    """Output sink that maintains state of its position."""

    @abstractmethod
    def write_batch(self, values: List[Any]) -> None:
        """Write a batch of output values.

        Called with a list of `value`s for each `(key, value)` at this
        point in the dataflow.

        See `PartitionedOutput.part_fn` for how the key is mapped to
        partition.

        Args:
            values:
                Values in the dataflow. Non-deterministically batched.

        """
        ...

    def snapshot(self) -> Any:
        """Snapshot the position of the next write of this sink.

        This will be returned to you via the `resume_state` parameter
        of your output builder.

        Be careful of "off by one" errors in resume state. This should
        return a state that, when built into a sink, resumes writing
        _after the last written item_, not overwriting the same item.

        This is guaranteed to never be called after `close()`.

        Returns:
            Resume state.

        """
        return None

    def close(self) -> None:
        """Cleanup this sink when the dataflow completes.

        This is not guaranteed to be called. It will only be called
        when the dataflow finishes on finite input. It will not be
        called during an abrupt or abort shutdown.

        """
        return

Ancestors

  • abc.ABC

Subclasses

  • bytewax.connectors.files._FileSink

Methods

def close(self) ‑> None

Cleanup this sink when the dataflow completes.

This is not guaranteed to be called. It will only be called when the dataflow finishes on finite input. It will not be called during an abrupt or abort shutdown.

Expand source code
def close(self) -> None:
    """Cleanup this sink when the dataflow completes.

    This is not guaranteed to be called. It will only be called
    when the dataflow finishes on finite input. It will not be
    called during an abrupt or abort shutdown.

    """
    return
def snapshot(self) ‑> Any

Snapshot the position of the next write of this sink.

This will be returned to you via the resume_state parameter of your output builder.

Be careful of "off by one" errors in resume state. This should return a state that, when built into a sink, resumes writing after the last written item, not overwriting the same item.

This is guaranteed to never be called after close().

Returns

Resume state.

Expand source code
def snapshot(self) -> Any:
    """Snapshot the position of the next write of this sink.

    This will be returned to you via the `resume_state` parameter
    of your output builder.

    Be careful of "off by one" errors in resume state. This should
    return a state that, when built into a sink, resumes writing
    _after the last written item_, not overwriting the same item.

    This is guaranteed to never be called after `close()`.

    Returns:
        Resume state.

    """
    return None
def write_batch(self, values: List[Any]) ‑> None

Write a batch of output values.

Called with a list of values for each (key, value) at this point in the dataflow.

See PartitionedOutput.part_fn() for how the key is mapped to partition.

Args

values: Values in the dataflow. Non-deterministically batched.

Expand source code
@abstractmethod
def write_batch(self, values: List[Any]) -> None:
    """Write a batch of output values.

    Called with a list of `value`s for each `(key, value)` at this
    point in the dataflow.

    See `PartitionedOutput.part_fn` for how the key is mapped to
    partition.

    Args:
        values:
            Values in the dataflow. Non-deterministically batched.

    """
    ...
class StatelessSink

Output sink that is stateless.

Expand source code
class StatelessSink(ABC):
    """Output sink that is stateless."""

    @abstractmethod
    def write_batch(self, items: List[Any]) -> None:
        """Write a batch of output items.

        Called multiple times whenever new items are seen at this
        point in the dataflow.

        Args:
            items:
                Items in the dataflow. Non-deterministically batched.

        """
        ...

    def close(self) -> None:
        """Cleanup this sink when the dataflow completes.

        This is not guaranteed to be called. It will only be called
        when the dataflow finishes on finite input. It will not be
        called during an abrupt or abort shutdown.

        """
        return

Ancestors

  • abc.ABC

Subclasses

  • bytewax.connectors.kafka._KafkaSink
  • bytewax.connectors.stdio._PrintSink
  • bytewax.testing._ListSink

Methods

def close(self) ‑> None

Cleanup this sink when the dataflow completes.

This is not guaranteed to be called. It will only be called when the dataflow finishes on finite input. It will not be called during an abrupt or abort shutdown.

Expand source code
def close(self) -> None:
    """Cleanup this sink when the dataflow completes.

    This is not guaranteed to be called. It will only be called
    when the dataflow finishes on finite input. It will not be
    called during an abrupt or abort shutdown.

    """
    return
def write_batch(self, items: List[Any]) ‑> None

Write a batch of output items.

Called multiple times whenever new items are seen at this point in the dataflow.

Args

items: Items in the dataflow. Non-deterministically batched.

Expand source code
@abstractmethod
def write_batch(self, items: List[Any]) -> None:
    """Write a batch of output items.

    Called multiple times whenever new items are seen at this
    point in the dataflow.

    Args:
        items:
            Items in the dataflow. Non-deterministically batched.

    """
    ...