bytewax.connectors.kafka#

Connectors for Kafka.

Importing this module requires the confluent-kafka package to be installed.

The input source returns a stream of KafkaSourceMessage. See the docstring for its use.

You can use KafkaSource and KafkaSink directly:

1>>> from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage
2>>> from bytewax import operators as op
3>>> from bytewax.dataflow import Dataflow
4>>>
5>>> brokers = ["localhost:19092"]
6>>> flow = Dataflow("example")
7>>> kinp = op.input("kafka-in", flow, KafkaSource(brokers, ["in-topic"]))
8>>> processed = op.map("map", kinp, lambda x: KafkaSinkMessage(x.key, x.value))
9>>> op.output("kafka-out", processed, KafkaSink(brokers, "out-topic"))

Or the custom operators:

 1>>> from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage
 2>>> from bytewax import operators as op
 3>>> from bytewax.dataflow import Dataflow
 4>>>
 5>>> brokers = ["localhost:19092"]
 6>>> flow = Dataflow("example")
 7>>> kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
 8>>> errs = op.inspect("errors", kinp.errs).then(op.raises, "crash-on-err")
 9>>> processed = op.map("map", kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value))
10>>> kop.output("kafka-out", processed, brokers=brokers, topic="out-topic")

Submodules#

Data#

K: TypeVar#

Type of key in Kafka message.

V: TypeVar#

Type of value in a Kafka message.

K_co: TypeVar#

Type of key in Kafka message.

V_co: TypeVar#

Type of value in a Kafka message.

K2: TypeVar#

Type of key in a modified Kafka message.

V2: TypeVar#

Type of value in a modified Kafka message.

MaybeStrBytes: TypeAlias#

Kafka message keys and values are optional.

BYTEWAX_CONSUMER_LAG_GAUGE#

‘Gauge(…)’

SerializedKafkaSourceMessage: TypeAlias#

A fully serialized Kafka message from the consumer.

KafkaSourceError: TypeAlias#

An error from the Kafka source with original message.

SerializedKafkaSourceResult: TypeAlias#

Items emitted from the Kafka source.

Might be either raw serialized messages or an error from the consumer.

SerializedKafkaSinkMessage: TypeAlias#

A fully serialized Kafka message ready for the producer.

Both key and value are optional.

Classes#

class KafkaSourceMessage#
Bases:

Message read from Kafka.

key: K#
value: V#
topic: Optional[str]#

‘field(…)’

headers: List[Tuple[str, bytes]]#

‘field(…)’

latency: Optional[float]#

‘field(…)’

offset: Optional[int]#

‘field(…)’

partition: Optional[int]#

‘field(…)’

timestamp: Optional[Tuple[int, int]]#

‘field(…)’

to_sink() KafkaSinkMessage[K, V]#

Convert a source message to be used with a sink.

Only key, value and timestamp are used.

class KafkaError#
Bases:

Error from a KafkaSource.

err: KafkaError#

Underlying error from the consumer.

msg: KafkaSourceMessage[K, V]#

Message attached to that error.

class KafkaSource(
brokers: Iterable[str],
topics: Iterable[str],
tail: bool = True,
starting_offset: int = OFFSET_BEGINNING,
add_config: Optional[Dict[str, str]] = None,
batch_size: int = 1000,
raise_on_errors: bool = True,
)#
Bases:

Use a set of Kafka topics as an input source.

Partitions are the unit of parallelism. Can support exactly-once processing.

Messages are emitted into the dataflow as SerializedKafkaSourceResult objects.

Initialization

Init.

Parameters:
  • brokers – List of host:port strings of Kafka brokers.

  • topics – List of topics to consume from.

  • tail – Whether to wait for new data on this topic when the end is initially reached.

  • starting_offset – Can be either confluent_kafka.OFFSET_BEGINNING or confluent_kafka.OFFSET_END. Defaults to beginning of topic.

  • add_config – Any additional configuration properties. See the rdkafka documentation for options.

  • batch_size – How many messages to consume at most at each poll. This is 1000 by default. The default setting is a suitable starting point for higher throughput dataflows, but can be tuned lower to potentially decrease individual message processing latency.

  • raise_on_errors – If set to False, errors won’t stop the dataflow, and will be emitted into the dataflow.

list_parts() List[str]#

Each Kafka partition is an input partition.

build_part(
step_id: str,
for_part: str,
resume_state: Optional[int],
) bytewax.connectors.kafka._KafkaSourcePartition#

See ABC docstring.

class KafkaSinkMessage#
Bases:

Message to be written to Kafka.

key: K_co#
value: V_co#
topic: Optional[str]#
headers: List[Tuple[str, bytes]]#

‘field(…)’

partition: Optional[int]#
timestamp: int = 0#
class KafkaSink(
brokers: Iterable[str],
topic: Optional[str],
add_config: Optional[Dict[str, str]] = None,
)#
Bases:

Use a single Kafka topic as an output sink.

Items consumed from the dataflow must be SerializedKafkaSinkMessage.

Workers are the unit of parallelism.

Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.

Initialization

Init.

Parameters:
  • brokers – List of host:port strings of Kafka brokers.

  • topic – Topic to produce to. If it’s None, the topic to produce to will be read in each KafkaSinkMessage.

  • add_config – Any additional configuration properties. See the rdkafka documentation for options.

build(
_step_id: str,
worker_index: int,
worker_count: int,
) bytewax.connectors.kafka._KafkaSinkPartition#

See ABC docstring.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now