Module bytewax.connectors.kafka

Connectors for Kafka.

Importing this module requires the confluent-kafka package to be installed.

Expand source code
"""Connectors for [Kafka](https://kafka.apache.org).

Importing this module requires the
[`confluent-kafka`](https://github.com/confluentinc/confluent-kafka-python)
package to be installed.

"""
from typing import Dict, Iterable

from confluent_kafka import (
    Consumer,
    KafkaError,
    OFFSET_BEGINNING,
    Producer,
    TopicPartition,
)
from confluent_kafka.admin import AdminClient

from bytewax.inputs import PartitionedInput, StatefulSource
from bytewax.outputs import DynamicOutput, StatelessSink

__all__ = [
    "KafkaInput",
    "KafkaOutput",
]


def _list_parts(client, topics):
    for topic in topics:
        # List topics one-by-one so if auto-create is turned on,
        # we respect that.
        cluster_metadata = client.list_topics(topic)
        topic_metadata = cluster_metadata.topics[topic]
        if topic_metadata.error is not None:
            raise RuntimeError(
                f"error listing partitions for Kafka topic `{topic!r}`: "
                f"{topic_metadata.error.str()}"
            )
        part_idxs = topic_metadata.partitions.keys()
        for i in part_idxs:
            yield f"{i}-{topic}"


class _KafkaSource(StatefulSource):
    def __init__(self, consumer, topic, part_idx, starting_offset, resume_state):
        self._offset = resume_state or starting_offset
        # Assign does not activate consumer grouping.
        consumer.assign([TopicPartition(topic, part_idx, self._offset)])
        self._consumer = consumer
        self._topic = topic

    def next(self):
        msg = self._consumer.poll(0.001)  # seconds
        if msg is None:
            return
        elif msg.error() is not None:
            if msg.error().code() == KafkaError._PARTITION_EOF:
                raise StopIteration()
            else:
                raise RuntimeError(
                    f"error consuming from Kafka topic `{self.topic!r}`: {msg.error()}"
                )
        else:
            item = (msg.key(), msg.value())
            # Resume reading from the next message, not this one.
            self._offset = msg.offset() + 1
            return item

    def snapshot(self):
        return self._offset

    def close(self):
        self._consumer.close()


class KafkaInput(PartitionedInput):
    """Use [Kafka](https://kafka.apache.org) topics as an input
    source.

    Kafka messages are emitted into the dataflow as two-tuples of
    `(key_bytes, value_bytes)`.

    Partitions are the unit of parallelism.

    Can support exactly-once processing.

    Args:

        brokers: List of `host:port` strings of Kafka brokers.

        topics: List of topics to consume from.

        tail: Whether to wait for new data on this topic when the end
            is initially reached.

        starting_offset: Can be either
            `confluent_kafka.OFFSET_BEGINNING` or
            `confluent_kafka.OFFSET_END`. Defaults to beginning of
            topic.

        add_config: Any additional configuration properties. See [the
            `rdkafka`
            documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
            for options.

    """

    def __init__(
        self,
        brokers: Iterable[str],
        topics: Iterable[str],
        tail: bool = True,
        starting_offset: int = OFFSET_BEGINNING,
        add_config: Dict[str, str] = None,
    ):
        add_config = add_config or {}

        if isinstance(brokers, str):
            raise TypeError("brokers must be an iterable and not a string")
        self._brokers = brokers
        if isinstance(topics, str):
            raise TypeError("topics must be an iterable and not a string")
        self._topics = topics
        self._tail = tail
        self._starting_offset = starting_offset
        self._add_config = add_config

    def list_parts(self):
        config = {
            "bootstrap.servers": ",".join(self._brokers),
        }
        config.update(self._add_config)
        client = AdminClient(config)

        return set(_list_parts(client, self._topics))

    def build_part(self, for_part, resume_state):
        part_idx, topic = for_part.split("-", 1)
        part_idx = int(part_idx)
        # TODO: Warn and then return None. This might be an indication
        # of dataflow continuation with a new topic (to enable
        # re-partitioning), which is fine.
        assert topic in self._topics, "Can't resume from different set of Kafka topics"

        config = {
            # We'll manage our own "consumer group" via recovery
            # system.
            "group.id": "BYTEWAX_IGNORED",
            "enable.auto.commit": "false",
            "bootstrap.servers": ",".join(self._brokers),
            "enable.partition.eof": str(not self._tail),
        }
        config.update(self._add_config)
        consumer = Consumer(config)
        return _KafkaSource(
            consumer, topic, part_idx, self._starting_offset, resume_state
        )


class _KafkaSink(StatelessSink):
    def __init__(self, producer, topic):
        self._producer = producer
        self._topic = topic

    def write(self, key_value):
        key, value = key_value
        self._producer.produce(self._topic, value, key)
        self._producer.flush()

    def close(self):
        self._producer.flush()


class KafkaOutput(DynamicOutput):
    """Use a single [Kafka](https://kafka.apache.org) topic as an
    output sink.

    Items consumed from the dataflow must look like two-tuples of
    `(key_bytes, value_bytes)`. Default partition routing is used.

    Workers are the unit of parallelism.

    Can support at-least-once processing. Messages from the resume
    epoch will be duplicated right after resume.

    Args:

        brokers: List of `host:port` strings of Kafka brokers.

        topic: Topic to produce to.

        add_config: Any additional configuration properties. See [the
            `rdkafka`
            documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
            for options.

    """

    def __init__(
        self,
        brokers: Iterable[str],
        topic: str,
        add_config: Dict[str, str] = None,
    ):
        add_config = add_config or {}

        self._brokers = brokers
        self._topic = topic
        self._add_config = add_config

    def build(self, worker_index, worker_count):
        config = {
            "bootstrap.servers": ",".join(self._brokers),
        }
        config.update(self._add_config)
        producer = Producer(config)
        return _KafkaSink(producer, self._topic)

Classes

class KafkaInput (brokers: Iterable[str], topics: Iterable[str], tail: bool = True, starting_offset: int = -2, add_config: Dict[str, str] = None)

Use Kafka topics as an input source.

Kafka messages are emitted into the dataflow as two-tuples of (key_bytes, value_bytes).

Partitions are the unit of parallelism.

Can support exactly-once processing.

Args

brokers
List of host:port strings of Kafka brokers.
topics
List of topics to consume from.
tail
Whether to wait for new data on this topic when the end is initially reached.
starting_offset
Can be either confluent_kafka.OFFSET_BEGINNING or confluent_kafka.OFFSET_END. Defaults to beginning of topic.
add_config
Any additional configuration properties. See the rdkafka documentation for options.
Expand source code
class KafkaInput(PartitionedInput):
    """Use [Kafka](https://kafka.apache.org) topics as an input
    source.

    Kafka messages are emitted into the dataflow as two-tuples of
    `(key_bytes, value_bytes)`.

    Partitions are the unit of parallelism.

    Can support exactly-once processing.

    Args:

        brokers: List of `host:port` strings of Kafka brokers.

        topics: List of topics to consume from.

        tail: Whether to wait for new data on this topic when the end
            is initially reached.

        starting_offset: Can be either
            `confluent_kafka.OFFSET_BEGINNING` or
            `confluent_kafka.OFFSET_END`. Defaults to beginning of
            topic.

        add_config: Any additional configuration properties. See [the
            `rdkafka`
            documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
            for options.

    """

    def __init__(
        self,
        brokers: Iterable[str],
        topics: Iterable[str],
        tail: bool = True,
        starting_offset: int = OFFSET_BEGINNING,
        add_config: Dict[str, str] = None,
    ):
        add_config = add_config or {}

        if isinstance(brokers, str):
            raise TypeError("brokers must be an iterable and not a string")
        self._brokers = brokers
        if isinstance(topics, str):
            raise TypeError("topics must be an iterable and not a string")
        self._topics = topics
        self._tail = tail
        self._starting_offset = starting_offset
        self._add_config = add_config

    def list_parts(self):
        config = {
            "bootstrap.servers": ",".join(self._brokers),
        }
        config.update(self._add_config)
        client = AdminClient(config)

        return set(_list_parts(client, self._topics))

    def build_part(self, for_part, resume_state):
        part_idx, topic = for_part.split("-", 1)
        part_idx = int(part_idx)
        # TODO: Warn and then return None. This might be an indication
        # of dataflow continuation with a new topic (to enable
        # re-partitioning), which is fine.
        assert topic in self._topics, "Can't resume from different set of Kafka topics"

        config = {
            # We'll manage our own "consumer group" via recovery
            # system.
            "group.id": "BYTEWAX_IGNORED",
            "enable.auto.commit": "false",
            "bootstrap.servers": ",".join(self._brokers),
            "enable.partition.eof": str(not self._tail),
        }
        config.update(self._add_config)
        consumer = Consumer(config)
        return _KafkaSource(
            consumer, topic, part_idx, self._starting_offset, resume_state
        )

Ancestors

Inherited members

class KafkaOutput (brokers: Iterable[str], topic: str, add_config: Dict[str, str] = None)

Use a single Kafka topic as an output sink.

Items consumed from the dataflow must look like two-tuples of (key_bytes, value_bytes). Default partition routing is used.

Workers are the unit of parallelism.

Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.

Args

brokers
List of host:port strings of Kafka brokers.
topic
Topic to produce to.
add_config
Any additional configuration properties. See the rdkafka documentation for options.
Expand source code
class KafkaOutput(DynamicOutput):
    """Use a single [Kafka](https://kafka.apache.org) topic as an
    output sink.

    Items consumed from the dataflow must look like two-tuples of
    `(key_bytes, value_bytes)`. Default partition routing is used.

    Workers are the unit of parallelism.

    Can support at-least-once processing. Messages from the resume
    epoch will be duplicated right after resume.

    Args:

        brokers: List of `host:port` strings of Kafka brokers.

        topic: Topic to produce to.

        add_config: Any additional configuration properties. See [the
            `rdkafka`
            documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
            for options.

    """

    def __init__(
        self,
        brokers: Iterable[str],
        topic: str,
        add_config: Dict[str, str] = None,
    ):
        add_config = add_config or {}

        self._brokers = brokers
        self._topic = topic
        self._add_config = add_config

    def build(self, worker_index, worker_count):
        config = {
            "bootstrap.servers": ",".join(self._brokers),
        }
        config.update(self._add_config)
        producer = Producer(config)
        return _KafkaSink(producer, self._topic)

Ancestors

Inherited members