Module bytewax.outputs

Dataflow output sinks.

Bytewax provides pre-packaged output configuration options for common sinks you might want to write dataflow output to.

Use

Create an OutputConfig subclass for the sink you want to write to. Then pass that config object to the Dataflow.capture() operator.

Expand source code
"""Dataflow output sinks.

Bytewax provides pre-packaged output configuration options for common
sinks you might want to write dataflow output to.

Use
---

Create an `OutputConfig` subclass for the sink you want to write to.
Then pass that config object to the `bytewax.dataflow.Dataflow.capture`
operator.
"""

from .bytewax import (  # noqa: F401
    KafkaOutputConfig,
    ManualEpochOutputConfig,
    ManualOutputConfig,
    OutputConfig,
    StdOutputConfig,
)


class TestingEpochOutputConfig(ManualEpochOutputConfig):
    """
    Append each output `(epoch, item)` to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `(epoch, item)` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """

    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)


class TestingOutputConfig(ManualOutputConfig):
    """
    Append each output item to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `item` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """

    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)

Classes

class KafkaOutputConfig (brokers, topic, additional_properties)

Use Kafka as the output.

A capture using KafkaOutput expects to receive data structured as two-tuples of bytes (key, payload) to form a Kafka record. Key may be None.

Args

brokers : List[str]
List of host:port strings of Kafka brokers.
topic : str
Topic to which producer will send records.
additional_properties : dict
Any additional configuration properties. Note that consumer group settings will be ignored. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for more options.

Returns

Config object. Pass this as the output_config argument to the bytewax.dataflow.Dataflow.output.

Ancestors

Instance variables

var additional_properties

Return an attribute of instance, which is of type owner.

var brokers

Return an attribute of instance, which is of type owner.

var topic

Return an attribute of instance, which is of type owner.

class ManualEpochOutputConfig (output_builder)

Call a Python callback function with each output epoch and item.

You probably want to use ManualOutputConfig unless you know you need specific epoch assignments for deep integration work.

Args

output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(epoch_item: Tuple[int, Any]) Builder function which returns a handler function for each worker thread, called with (epoch, item) whenever an item passes by this capture operator on this worker.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Ancestors

Subclasses

class ManualOutputConfig (output_builder)

Call a Python callback function with each output item.

Args

output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(item: Any) Builder function which returns a handler function for each worker thread, called with item whenever an item passes by this capture operator on this worker.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Ancestors

Subclasses

Instance variables

var output_builder

Return an attribute of instance, which is of type owner.

class OutputConfig

Base class for an output config.

These define how a certain stream of data should be output.

Ues a specific subclass of this that matches the output destination you'd like to write to.

Subclasses

class StdOutputConfig

Write the output items to standard out.

Items must have a valid __str__. If not, map the items into a string before capture.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Ancestors

class TestingEpochOutputConfig (ls)

Append each output (epoch, item) to a list. You only want to use this for unit testing.

Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.

Args

ls
Append each (epoch, item) to this list.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Expand source code
class TestingEpochOutputConfig(ManualEpochOutputConfig):
    """
    Append each output `(epoch, item)` to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `(epoch, item)` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """

    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)

Ancestors

class TestingOutputConfig (ls)

Append each output item to a list. You only want to use this for unit testing.

Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.

Args

ls
Append each item to this list.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Expand source code
class TestingOutputConfig(ManualOutputConfig):
    """
    Append each output item to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `item` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """

    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)

Ancestors

Inherited members