bytewax.connectors.kafka
#
Connectors for Kafka.
Importing this module requires the
confluent-kafka
package to be installed.
The input source returns a stream of
KafkaSourceMessage
. See the
docstring for its use.
You can use KafkaSource
and
KafkaSink
directly:
1>>> from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage
2>>> from bytewax import operators as op
3>>> from bytewax.dataflow import Dataflow
4>>>
5>>> brokers = ["localhost:19092"]
6>>> flow = Dataflow("example")
7>>> kinp = op.input("kafka-in", flow, KafkaSource(brokers, ["in-topic"]))
8>>> processed = op.map("map", kinp, lambda x: KafkaSinkMessage(x.key, x.value))
9>>> op.output("kafka-out", processed, KafkaSink(brokers, "out-topic"))
Or the custom operators:
1>>> from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage
2>>> from bytewax import operators as op
3>>> from bytewax.dataflow import Dataflow
4>>>
5>>> brokers = ["localhost:19092"]
6>>> flow = Dataflow("example")
7>>> kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
8>>> errs = op.inspect("errors", kinp.errs).then(op.raises, "crash-on-err")
9>>> processed = op.map("map", kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value))
10>>> kop.output("kafka-out", processed, brokers=brokers, topic="out-topic")
Submodules#
Data#
- BYTEWAX_CONSUMER_LAG_GAUGE#
‘Gauge(…)’
Classes#
- class KafkaSourceMessage#
-
Message read from Kafka.
- class KafkaError#
-
Error from a
KafkaSource
.- err: KafkaError#
Underlying error from the consumer.
- msg: KafkaSourceMessage[K, V]#
Message attached to that error.
- class KafkaSource(
- brokers: Iterable[str],
- topics: Iterable[str],
- tail: bool = True,
- starting_offset: int = OFFSET_BEGINNING,
- add_config: Optional[Dict[str, str]] = None,
- batch_size: int = 1000,
- raise_on_errors: bool = True,
-
Use a set of Kafka topics as an input source.
Partitions are the unit of parallelism. Can support exactly-once processing.
Messages are emitted into the dataflow as
SerializedKafkaSourceResult
objects.Initialization
Init.
- Parameters:
brokers – List of
host:port
strings of Kafka brokers.topics – List of topics to consume from.
tail – Whether to wait for new data on this topic when the end is initially reached.
starting_offset – Can be either
confluent_kafka.OFFSET_BEGINNING
orconfluent_kafka.OFFSET_END
. Defaults to beginning of topic.add_config – Any additional configuration properties. See the
rdkafka
documentation for options.batch_size – How many messages to consume at most at each poll. This is 1000 by default. The default setting is a suitable starting point for higher throughput dataflows, but can be tuned lower to potentially decrease individual message processing latency.
raise_on_errors – If set to False, errors won’t stop the dataflow, and will be emitted into the dataflow.
- class KafkaSinkMessage#
-
Message to be written to Kafka.
- class KafkaSink( )#
-
Use a single Kafka topic as an output sink.
Items consumed from the dataflow must be
SerializedKafkaSinkMessage
.Workers are the unit of parallelism.
Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.
Initialization
Init.
- Parameters:
brokers – List of
host:port
strings of Kafka brokers.topic – Topic to produce to. If it’s
None
, the topic to produce to will be read in eachKafkaSinkMessage
.add_config – Any additional configuration properties. See the
rdkafka
documentation for options.