Module bytewax.recovery

Recovering from failures.

Bytewax allows you to recover a stateful dataflow; it will let you resume processing and output due to a failure without re-processing all initial data to re-calculate all internal state.

It does this by storing state and progress information for a single dataflow instance in a recovery store backed by a durable state storage system of your choosing, e.g. SQLite or Kafka. See the subclasses of RecoveryConfig in this module for the supported datastores, the specifics of how each is utilized, and tradeoffs.

Preparation

  1. Create a recovery config for describing how to connect to the recovery store of your choosing.

  2. Pass that recovery config as the recovery_config argument to the entry point running your dataflow (e.g. bytewax.cluster_main()).

Execution

Make sure your worker processes have access to the recovery store.

Then, run your dataflow! It will start backing up recovery data automatically.

Recovering

If the dataflow fails, first you must fix whatever underlying fault caused the issue. That might mean deploying new code which fixes a bug or resolving an issue with a connected system.

Once that is done, re-run the dataflow using the same recovery config and thus re-connect to the same recovery store. Bytewax will automatically read the progress of the previous dataflow execution and determine the most recent point that processing can resume at. Output should resume from that point.

If you want to fully restart a dataflow and ignore previous state, delete the data in the recovery store using whatever operational tools you have for that storage type.

Caveats

Recovery data for multiple dataflows must not be mixed together. See the docs for each RecoveryConfig subclass for what this means depending on the recovery store. E.g. when using a KafkaRecoveryConfig, each dataflow must have a distinct topic prefix.

See comments on input configuration types in bytewax.inputs for any limitations each might have regarding recovery.

It is possible that your output systems will see duplicate data around the resume point; design your systems to support at-least-once processing.

Currently it is not possible to recover a dataflow with a different number of workers than when it failed.

The epoch is the time unit of recovery: dataflows will only resume on epoch boundaries, with the resume epoch being where the dataflow witll resume. Bytewax defaults to a new epoch every 10 seconds. See EpochConfig for more info on epochs.

Expand source code
"""Recovering from failures.

Bytewax allows you to **recover** a stateful dataflow; it will let you
resume processing and output due to a failure without re-processing
all initial data to re-calculate all internal state.

It does this by storing state and progress information for a single
dataflow instance in a **recovery store** backed by a durable state
storage system of your choosing, e.g. SQLite or Kafka. See the
subclasses of `RecoveryConfig` in this module for the supported
datastores, the specifics of how each is utilized, and tradeoffs.

Preparation
-----------

1. Create a **recovery config** for describing how to connect to the
recovery store of your choosing.

2. Pass that recovery config as the `recovery_config` argument to the
entry point running your dataflow (e.g. `bytewax.cluster_main()`).

Execution
---------

Make sure your worker processes have access to the recovery store.

Then, run your dataflow! It will start backing up recovery data
automatically.

Recovering
----------

If the dataflow fails, first you must fix whatever underlying fault
caused the issue. That might mean deploying new code which fixes a bug
or resolving an issue with a connected system.

Once that is done, re-run the dataflow using the _same recovery
config_ and thus re-connect to the _same recovery store_. Bytewax will
automatically read the progress of the previous dataflow execution and
determine the most recent point that processing can resume at. Output
should resume from that point.

If you want to fully restart a dataflow and ignore previous state,
delete the data in the recovery store using whatever operational tools
you have for that storage type.

Caveats
-------

Recovery data for multiple dataflows _must not_ be mixed together. See
the docs for each `RecoveryConfig` subclass for what this means
depending on the recovery store. E.g. when using a
`KafkaRecoveryConfig`, each dataflow must have a distinct topic
prefix.

See comments on input configuration types in `bytewax.inputs` for any
limitations each might have regarding recovery.

It is possible that your output systems will see duplicate data around
the resume point; design your systems to support at-least-once
processing.

Currently it is not possible to recover a dataflow with a different
number of workers than when it failed.

The epoch is the time unit of recovery: dataflows will only resume on
epoch boundaries, with the **resume epoch** being where the dataflow
witll resume. Bytewax defaults to a new epoch every 10 seconds. See
`bytewax.execution.EpochConfig` for more info on epochs.

"""

from .bytewax import (  # noqa: F401
    KafkaRecoveryConfig,
    RecoveryConfig,
    SqliteRecoveryConfig,
)

__all__ = [
    "KafkaRecoveryConfig",
    "RecoveryConfig",
    "SqliteRecoveryConfig",
]

Classes

class KafkaRecoveryConfig (brokers, topic_prefix)

Use Kafka to store recovery data.

Uses a "progress" topic and a "state" topic with a number of partitions equal to the number of workers. Will take advantage of log compaction so that topic size is proportional to state size, not epoch count.

Use a distinct topic prefix per dataflow so recovery data is not mixed.

>>> from bytewax.execution import run_main
>>> from bytewax.inputs import TestingInputConfig
>>> from bytewax.outputs import StdOutputConfig
>>> flow = Dataflow()
>>> flow.inp("inp", TestingInputConfig(range(3)))
>>> flow.capture(StdOutputConfig())
>>> recovery_config = KafkaRecoveryConfig(
...     ["localhost:9092"],
...     "sample-dataflow",
... )
>>> run_main(
...     flow,
...     recovery_config=recovery_config,
... )  # doctest: +ELLIPSIS
(...)

If there's no previous recovery data, topics will automatically be created with the correct number of partitions and log compaction enabled

Args

brokers : List[str]
List of host:port strings of Kafka brokers.
topic_prefix : str
Prefix used for naming topics. Must be distinct per-dataflow. Two topics will be created using this prefix "topic_prefix-progress" and "topic_prefix-state".

Returns

Config object. Pass this as the recovery_config argument to your execution entry point.

Ancestors

Instance variables

var brokers

Return an attribute of instance, which is of type owner.

var topic_prefix

Return an attribute of instance, which is of type owner.

class RecoveryConfig

Base class for a recovery config.

This describes how each worker in a dataflow cluster should store its recovery data.

Use a specific subclass of this that matches the kind of storage system you are going to use. See the subclasses in this module.

Subclasses

class SqliteRecoveryConfig (db_dir)

Use SQLite to store recovery data.

Creates a SQLite DB per-worker in a given directory. Multiple DBs are used to allow workers to write without contention.

Use a distinct directory per dataflow so recovery data is not mixed.

>>> from bytewax.execution import run_main
>>> from bytewax.inputs import TestingInputConfig
>>> from bytewax.outputs import StdOutputConfig
>>> flow = Dataflow()
>>> flow.input("inp", TestingInputConfig(range(3)))
>>> flow.capture(StdOutputConfig())
>>> tmp_dir = TemporaryDirectory()  # We'll store this somewhere temporary for this test.
>>> recovery_config = SqliteRecoveryConfig(tmp_dir)
>>> run_main(
...     flow,
...     recovery_config=recovery_config,
... )  # doctest: +ELLIPSIS
(...)

DB files and tables will automatically be created if there's no previous recovery data.

Args

db_dir : Path
Existing directory to store per-worker DBs in. Must be distinct per-dataflow. DB files will have names like "worker0.sqlite3". You can use "." for the current directory.

Returns

Config object. Pass this as the recovery_config argument to your execution entry point.

Ancestors

Instance variables

var db_dir

Return an attribute of instance, which is of type owner.