Module bytewax.outputs
Dataflow output sinks.
Bytewax provides pre-packaged output configuration options for common sinks you might want to write dataflow output to.
Use
Create an OutputConfig
subclass for the sink you want to write to.
Then pass that config object to the Dataflow.capture()
operator.
Expand source code
"""Dataflow output sinks.
Bytewax provides pre-packaged output configuration options for common
sinks you might want to write dataflow output to.
Use
---
Create an `OutputConfig` subclass for the sink you want to write to.
Then pass that config object to the `bytewax.dataflow.Dataflow.capture`
operator.
"""
from .bytewax import ( # noqa: F401
KafkaOutputConfig,
ManualEpochOutputConfig,
ManualOutputConfig,
OutputConfig,
StdOutputConfig,
)
class TestingEpochOutputConfig(ManualEpochOutputConfig):
"""
Append each output `(epoch, item)` to a list.
You only want to use this for unit testing.
Because the list is in-memory, you will need to carefuly
coordinate use or assertions on this list when using multiple
workers.
Args:
ls: Append each `(epoch, item)` to this list.
Returns:
Config object.
Pass this to the `bytewax.dataflow.Dataflow.capture` operator.
"""
# This is needed to avoid pytest trying to load this class as
# a test since its name starts with "Test"
__test__ = False
def __new__(cls, ls):
"""
In classes defined by PyO3 we can only use __new__, not __init__
"""
return super().__new__(cls, lambda wi, wn: ls.append)
class TestingOutputConfig(ManualOutputConfig):
"""
Append each output item to a list.
You only want to use this for unit testing.
Because the list is in-memory, you will need to carefuly
coordinate use or assertions on this list when using multiple
workers.
Args:
ls: Append each `item` to this list.
Returns:
Config object.
Pass this to the `bytewax.dataflow.Dataflow.capture` operator.
"""
# This is needed to avoid pytest trying to load this class as
# a test since its name starts with "Test"
__test__ = False
def __new__(cls, ls):
"""
In classes defined by PyO3 we can only use __new__, not __init__
"""
return super().__new__(cls, lambda wi, wn: ls.append)
Classes
class KafkaOutputConfig (brokers, topic, additional_properties)
-
Use Kafka as the output.
A
capture
using KafkaOutput expects to receive data structured as two-tuples of bytes (key, payload) to form a Kafka record. Key may beNone
.Args
brokers
:List[str]
- List of
host:port
strings of Kafka brokers. topic
:str
- Topic to which producer will send records.
additional_properties
:dict
- Any additional configuration properties. Note that consumer group settings will be ignored. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for more options.
Returns
Config object. Pass this as the
output_config
argument to thebytewax.dataflow.Dataflow.output
.Ancestors
Instance variables
var additional_properties
-
Return an attribute of instance, which is of type owner.
var brokers
-
Return an attribute of instance, which is of type owner.
var topic
-
Return an attribute of instance, which is of type owner.
class ManualEpochOutputConfig (output_builder)
-
Call a Python callback function with each output epoch and item.
You probably want to use
ManualOutputConfig
unless you know you need specific epoch assignments for deep integration work.Args
output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(epoch_item: Tuple[int, Any])
Builder function which returns a handler function for each worker thread, called with(epoch, item)
whenever an item passes by this capture operator on this worker.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Ancestors
Subclasses
class ManualOutputConfig (output_builder)
-
Call a Python callback function with each output item.
Args
output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(item: Any)
Builder function which returns a handler function for each worker thread, called withitem
whenever an item passes by this capture operator on this worker.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Ancestors
Subclasses
Instance variables
var output_builder
-
Return an attribute of instance, which is of type owner.
class OutputConfig
-
Base class for an output config.
These define how a certain stream of data should be output.
Ues a specific subclass of this that matches the output destination you'd like to write to.
Subclasses
class StdOutputConfig
-
Write the output items to standard out.
Items must have a valid
__str__
. If not, map the items into a string before capture.Returns
Config object. Pass this to the
Dataflow.capture()
operator.Ancestors
class TestingEpochOutputConfig (ls)
-
Append each output
(epoch, item)
to a list. You only want to use this for unit testing.Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.
Args
ls
- Append each
(epoch, item)
to this list.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Expand source code
class TestingEpochOutputConfig(ManualEpochOutputConfig): """ Append each output `(epoch, item)` to a list. You only want to use this for unit testing. Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers. Args: ls: Append each `(epoch, item)` to this list. Returns: Config object. Pass this to the `bytewax.dataflow.Dataflow.capture` operator. """ # This is needed to avoid pytest trying to load this class as # a test since its name starts with "Test" __test__ = False def __new__(cls, ls): """ In classes defined by PyO3 we can only use __new__, not __init__ """ return super().__new__(cls, lambda wi, wn: ls.append)
Ancestors
class TestingOutputConfig (ls)
-
Append each output item to a list. You only want to use this for unit testing.
Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.
Args
ls
- Append each
item
to this list.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Expand source code
class TestingOutputConfig(ManualOutputConfig): """ Append each output item to a list. You only want to use this for unit testing. Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers. Args: ls: Append each `item` to this list. Returns: Config object. Pass this to the `bytewax.dataflow.Dataflow.capture` operator. """ # This is needed to avoid pytest trying to load this class as # a test since its name starts with "Test" __test__ = False def __new__(cls, ls): """ In classes defined by PyO3 we can only use __new__, not __init__ """ return super().__new__(cls, lambda wi, wn: ls.append)
Ancestors
Inherited members