Migration Guide

This guide can help you upgrade code through breaking changes from one Bytewax version to the next. For a detailed list of all changes, see the CHANGELOG.

From v0.16 to v0.17

Bytewax v0.17 introduces major changes to the way that recovery works in order to support rescaling. In v0.17, the number of workers in a cluster can now be changed by stopping the dataflow execution and specifying a different number of workers on resume.

In addition to rescaling, we've changed the Bytewax inputs and outputs API to support batching which has yielded significant performance improvements.

In this article, we'll go over the updates we've made to our API and explain the changes you'll need to make to your dataflows to upgrade to v0.17.

Recovery changes

In v0.17, recovery has been changed to support rescaling the number of workers in a dataflow. You now pre-create a fixed number of SQLite recovery DB files before running the dataflow.

SQLite recovery DBs created with versions of Bytewax prior to v0.17 are not compatible with this release.

Creating recovery partitions

Creating recovery stores has been moved to a separate step from running your dataflow.

Recovery partitions must be pre-initialized before running the dataflow initially. This is done by executing this module:

$ python -m bytewax.recovery db_dir/ 4

This will create a set of partitions:

$ ls db_dir/
part-0.sqlite3
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3

Once the recovery partition files have been created, they must be placed in locations that are accessible to the workers. The cluster has a whole must have access to all partitions, but any given worker need not have access to any partition in particular (or any at all). It is ok if a given partition is accesible by multiple workers; only one worker will use it.

If you are not running in a cluster environment but on a single machine, placing all the partitions in a single local filesystem directory is fine.

Although the partition init script will not create these, partitions after execution may consist of multiple files:

$ ls db_dir/
part-0.sqlite3
part-0.sqlite3-shm
part-0.sqlite3-wal
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
part-3.sqlite3-shm
part-3.sqlite3-wal

You must remember to move the files with the prefix part-*. all together.

Choosing the number of recovery partitions

An important consideration when creating the initial number of recovery partitions; When rescaling the number of workers, the number of recovery partitions is currently fixed.

If you are scaling up the number of workers in your cluster to increase the total throughput of your dataflow, the work of writing recovery data to the recovery partitions will still be limited to the initial number of recovery partitions. If you will likely scale up your dataflow to accomodate increased demand, we recommend that that you consider creating more recovery partitions than you will initially need. Having multiple recovery partitions handled by a single worker is fine.

Epoch interval -> Snapshot interval

We've renamed the cli option of epoch-interval to snapshot-interval to better describe its affect on dataflow execution. The snapshot interval is the system time duration (in seconds) to snapshot state for recovery.

Recovering a dataflow can only happen on the boundaries of the most recently completed snapshot across all workers, but be aware that making the snapshot-interval more frequent increases the amount of recovery work and may impact performance.

Backup interval, and backing up recovery partitions.

We've also introduced an additional parameter to running a dataflow: backup-interval.

When running a Dataflow with recovery enabled, it is recommended to back up your recovery partitions on a regular basis for disaster recovery.

The backup-interval parameter is the length of time to wait before "garbage collecting" older snapshots. This enables a dataflow to successfully resume when backups of recovery partitions happen at different times, which will be true in most distributed deployments.

This value should be set in excess of the interval you can guarantee that all recovery partitions will be backed up to account for transient failures. It defaults to one day.

If you attempt to resume from a set of recovery partitions for which the oldest and youngest backups are more than the backup interval apart, the resumed dataflow could have corrupted state.

Input and Output changes

In v0.17, we have restructured input and output to support batching for increased throughput.

If you have created custom input connectors, you'll need to update them to use the new API.

Input changes

The list_parts method has been updated to return a List[str] instead of a Set[str], and now should only reflect the available input partitions that a given worker has access to. You no longer need to return the complete set of partitions for all workers.

The next method of StatefulSource and StatelessSource has been changed to next_batch and should to return a List of elements each time it is called. If there are no elements to return, this method should return the empty list.

Next awake

Input sources now have an optional next_awake method which you can use to schedule when the next next_batch call should occur. You can use this to "sleep" the input operator for a fixed amount of time while you are waiting for more input.

The default behavior uses a simple heuristic to prevent a spin loop when there is no input. Always use next_awake rather than using a time.sleep in an input source.

See the periodic_input.py example in the examples directory for an implementation that uses this functionality.

Async input adapter

We've included a new bytewax.inputs.batcher_async to help you use async Python libraries in Bytewax input sources. It lets you wrap an async iterator and specify a maximum time and size to collect items into a batch.

Using Kafka for recovery is now removed

v0.17 removes the deprecated KafkaRecoveryConfig as a recovery store to support the ability to rescale the number of workers.

Support for additional platforms

Bytewax is now available for linux/aarch64 and linux/armv7.

From 0.15 to 0.16

Bytewax v0.16 introduces major changes to the way that custom Bytewax inputs are created, as well as improvements to windowing and execution. In this article, we'll cover some of the updates we've made to our API to make upgrading to v0.16 easier.

Input changes

In v0.16, we have restructured input to be based around partitions in order to support rescaling stateful dataflows in the future. We have also dropped the Config suffix to our input classes. As an example, KafkaInputConfig has been renamed to KafkaInput and has been moved to bytewax.connectors.kafka.KafkaInput.

ManualInputConfig is now replaced by two base superclasses that can be subclassed to create custom input sources. They are DynamicInput and PartitionedInput.

DynamicInput

DynamicInput sources are input sources that support reading distinct items from any number of workers concurrently.

DynamicInput sources do not support resume state and thus generally do not provide delivery guarantees better than at-most-once.

PartitionedInput

PartitionedInput sources can keep internal state on the current position of each partition. If a partition can "rewind" and resume reading from a previous position, they can provide delivery guarantees of at-least-once or better.

PartitionedInput sources maintain the state of each source and re-build that state during recovery.

Deprecating Kafka Recovery

In order to better support rescaling Dataflows, the Kafka recovery store option is being deprecated and will be removed from a future release in favor of the SQLite recovery store.

Capture -> Output

The capture operator has been renamed to output in v0.16 and is now stateful, so requires a step ID:

In v0.15 and before:

flow.capture(StdOutputConfig())

In v0.16+:

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput

flow = Dataflow()
flow.output("out", StdOutput())

ManualOutputConfig is now replaced by two base superclasses that can be subclassed to create custom output sinks. They are DynamicOutput and PartitionedOutput.

New entrypoint

In Bytewax v0.16, the way that Dataflows are run has been simplified, and most execution functions have been removed.

Similar to other Python frameworks like Flask and FastAPI, Dataflows can now be run using a Datfaflow import string in the <module_name>:<dataflow_variable_name_or_factory_function> format. As an example, given a file named dataflow.py with the following contents:

from bytewax.dataflow import Dataflow
from bytewax.testing import TestingInput
from bytewax.connectors.stdio import StdOutput

flow = Dataflow()
flow.input("in", TestingInput(range(3)))
flow.output("out", StdOutput())

Since a Python file dataflow.py defines a module dataflow, the Dataflow can be run with the following invocation:

> python -m bytewax.run dataflow
0
1
2

By default, Bytewax looks for a variable in the given module named flow, so we can eliminate the <dataflow_variable_name_or_factory_function> part of our import string.

Processes, workers, recovery stores and other options can be configured with command line flags or environment variables. For the full list of options see the --help command line flag:

> python -m bytewax.run --help
usage: python -m bytewax.run [-h] [-p PROCESSES] [-w WORKERS_PER_PROCESS] [-i PROCESS_ID] [-a ADDRESSES] [--sqlite-directory SQLITE_DIRECTORY] [--epoch-interval EPOCH_INTERVAL] import_str

Run a bytewax dataflow

positional arguments:
  import_str            Dataflow import string in the format <module_name>:<dataflow_variable_or_factory> Example: src.dataflow:flow or src.dataflow:get_flow('string_argument')

options:
  -h, --help            show this help message and exit

Scaling:
  You should use either '-p' to spawn multiple processes on this same machine, or '-i/-a' to spawn a single process on different machines

  -p PROCESSES, --processes PROCESSES
                        Number of separate processes to run [env: BYTEWAX_PROCESSES]
  -w WORKERS_PER_PROCESS, --workers-per-process WORKERS_PER_PROCESS
                        Number of workers for each process [env: BYTEWAX_WORKERS_PER_PROCESS]
  -i PROCESS_ID, --process-id PROCESS_ID
                        Process id [env: BYTEWAX_PROCESS_ID]
  -a ADDRESSES, --addresses ADDRESSES
                        Addresses of other processes, separated by semicolumn: -a "localhost:2021;localhost:2022;localhost:2023" [env: BYTEWAX_ADDRESSES]

Recovery:
  --sqlite-directory SQLITE_DIRECTORY
                        Passing this argument enables sqlite recovery in the specified folder [env: BYTEWAX_SQLITE_DIRECTORY]
  --epoch-interval EPOCH_INTERVAL
                        Number of seconds between state snapshots [env: BYTEWAX_EPOCH_INTERVAL]

Porting a simple example from 0.15 to 0.16

This is what a simple example looked like in 0.15:

import operator
import re

from datetime import timedelta, datetime

from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.execution import run_main
from bytewax.window import SystemClockConfig, TumblingWindowConfig


def input_builder(worker_index, worker_count, resume_state):
    state = None  # ignore recovery
    for line in open("wordcount.txt"):
        yield state, line


def lower(line):
    return line.lower()


def tokenize(line):
    return re.findall(r'[^\s!,.?":;0-9]+', line)


def initial_count(word):
    return word, 1


def add(count1, count2):
    return count1 + count2


clock_config = SystemClockConfig()
window_config = TumblingWindowConfig(length=timedelta(seconds=5))

flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)
flow.capture(StdOutputConfig())

run_main(flow)

To port the example to the 0.16 version we need to make a few changes.

Imports

Let's start with the existing imports:

from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.execution import run_main

Becomes:

from bytewax.connectors.files import FileInput
from bytewax.connectors.stdio import StdOutput

We removed run_main as it is now only used for unit testing dataflows. Bytewax now has a built-in FileInput connector, which uses the PartitionedInput connector superclass.

Since we are using a built-in connector to read from this file, we can delete our input_builder function above.

Windowing

Most of the operators from v0.15 are the same, but the start_at parameter of windowing functions has been changed to align_to. The start_at parameter incorrectly implied that there were no potential windows before that time. What determines if an item is late for a window is not the windowing definition, but the watermark of the clock.

SlidingWindow and TumblingWindow now require the align_to parameter. Previously, this was filled with the timestamp of the start of the Dataflow, but must now be specified. Specifying this parameter ensures that windows are consistent across Dataflow restarts, so make sure that you don't change this parameter between executions.

The old TumblingWindow definition:

clock_config = SystemClockConfig()
window_config = TumblingWindowConfig(length=timedelta(seconds=5))

becomes:

clock_config = SystemClockConfig()
window_config = TumblingWindow(
    length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
)

Output and execution

Similarly to the input, the output configuration is now stateful. capture has been renamed to output, and now takes a name, as all stateful operators do.

So we move from this:

flow.capture(StdOutputConfig())

To this:

flow.output("out", StdOutput())

Complete code

The complete code for the new simple example now looks like this:

import operator
import re

from datetime import timedelta, datetime, timezone

from bytewax.dataflow import Dataflow
from bytewax.connectors.files import FileInput
from bytewax.connectors.stdio import StdOutput
from bytewax.window import SystemClockConfig, TumblingWindow


def lower(line):
    return line.lower()


def tokenize(line):
    return re.findall(r'[^\s!,.?":;0-9]+', line)


def initial_count(word):
    return word, 1


def add(count1, count2):
    return count1 + count2


clock_config = SystemClockConfig()
window_config = TumblingWindow(
    length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
)

flow = Dataflow()
flow.input("inp", FileInput("examples/sample_data/wordcount.txt"))
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)
flow.output("out", StdOutput())

Running our completed Dataflow looks like this, as we've named our Dataflow variable flow in a file named dataflow:

> python -m bytewax.run dataflow:flow
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...

We can even run our sample Dataflow on multiple workers to process the file in parallel:

> python -m bytewax.run dataflow:flow -p2
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...

In the background, Bytewax has spawned two processes, each of which is processing a part of the file. To see this more clearly, you can start each worker by hand:

In one terminal, run:

> python -m bytewax.run dataflow:flow -i0 -a "localhost:2101;localhost:2102"
('not', 1)
('end', 1)
('opposing', 1)
('take', 1)
...

And in a second terminal, run:

> python -m bytewax.run dataflow:flow -i1 -a "localhost:2101;localhost:2102"
('question', 1)
('the', 3)
('troubles', 1)
('fortune', 1)
...

From 0.10 to 0.11

Bytewax 0.11 introduces major changes to the way that Bytewax dataflows are structured, as well as improvements to recovery and windowing. This document outlines the major changes between Bytewax 0.10 and 0.11.

Input and epochs

Bytewax is built on top of the Timely Dataflow framework. The idea of timestamps (which we refer to in Bytewax as epochs) is central to Timely Dataflow's progress tracking mechanism.

Bytewax initially adopted an input model that included managing the epochs at which input was introduced. The 0.11 version of Bytewax removes the need to manage epochs directly.

Epochs continue to exist in Bytewax, but are now managed internally to represent a unit of recovery. Bytewax dataflows that are configured with recovery will shapshot their state after processing all items in an epoch. In the event of recovery, Bytewax will resume a dataflow at the last snapshotted state. The frequency of snapshotting can be configured with an EpochConfig.

Recoverable input

Bytewax 0.11 will now allow you to recover the state of the input to your dataflow.

Manually constructed input functions, like those used with ManualInputConfig, now take a third argument. If your dataflow is interrupted, the third argument passed to your input function can be used to reconstruct the state of your input at the last recovery snapshot, provided you write your input logic accordingly. The input_builder function must return a tuple of (resume_state, datum).

Bytewax's built-in input handlers, like KafkaInputConfig are also recoverable. KafkaInputConfig will store information about consumer offsets in the configured Bytewax recovery store. In the event of recovery, KafkaInputConfig will start reading from the offsets that were last committed to the recovery store.

Stateful windowing

Version 0.11 also introduces stateful windowing operators, including a new fold_window operator.

Previously, Bytewax included helper functions to manage windows in terms of epochs. Now that Bytewax manages epochs internally, windowing functions are now operators that appear as a processing step in a dataflow. Dataflows can now have more than one windowing step.

Bytewax's stateful windowing operators are now built on top of its recovery system, and their operations can be recovered in the event of a failure. See the documentation on recovery for more information.

Output configurations

The 0.11 release of Bytewax adds some prepackaged output configuration options for common use-cases:

ManualOutputConfig, which calls a Python callback function for each output item.

StdOutputConfig, which prints each output item to stdout.

Import path changes and removed entrypoints

In Bytewax 0.11, the overall Python module structure has changed, and some execution entrypoints have been removed.

  • cluster_main, spawn_cluster, and run_main have moved to bytewax.execution
  • Dataflow has moved to bytewax.dataflow
  • run and run_cluster have been removed

Porting the Simple example from 0.10 to 0.11

This is what the Simple example looked like in 0.10:

import re

from bytewax import Dataflow, run


def file_input():
    for line in open("wordcount.txt"):
        yield 1, line


def lower(line):
    return line.lower()


def tokenize(line):
    return re.findall(r'[^\s!,.?":;0-9]+', line)


def initial_count(word):
    return word, 1


def add(count1, count2):
    return count1 + count2


flow = Dataflow()
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_epoch(add)
flow.capture()


for epoch, item in run(flow, file_input()):
    print(item)

To port the example to the 0.11 version we need to make a few changes.

Imports

Let's start with the existing imports:

from bytewas import Dataflow, run

Becomes:

from bytewax.dataflow import Dataflow
from bytewax.execution import run_main

We moved from run to run_main as the execution API has been simplified, and we can now just use the run_main function to execute our dataflow.

Input

The way bytewax handles input changed with 0.11. input is now a proper operator on the Dataflow, and the function now takes 3 parameters: worker_index, worker_count, resume_state. This allows us to distribute the input across workers, and to handle recovery if we want to. We are not going to do that in this example, so the change is minimal.

The input function goes from:

def file_input():
    for line in open("wordcount.txt"):
        yield 1, line

to:

def input_builder(worker_index, worker_count, resume_state):
    state = None  # ignore recovery
    for line in open("wordcount.txt"):
        yield state, line

So instead of manually yielding the epoch in the input function, we can either ignore it (passing None as state), or handle the value to implement recovery (see the recovery chapter).

Then we need to wrap the input_builder with ManualInputConfig, give it a name ("file_input" here) and pass it to the input operator (rather than the run function):

from bytewax.inputs import ManualInputConfig


flow.input("file_input", ManualInputConfig(input_builder))

Operators

Most of the operators are the same, but there is a notable change in the flow: where we used reduce_epoch we are now using reduce_window. Since the epochs concept is now considered an internal detail in bytewax, we need to define a way to let the reduce operator know when to close a specific window. Previously this was done everytime the epoch changed, while now it can be configured with a time window. We need two config objects to do this:

  • clock_config
  • window_config

The clock_config is used to tell the window-based operators what reference clock to use, here we use the SystemClockConfig that just uses the system's clock. The window_config is used to define the time window we want to use. Here we'll use the TumblingWindow that allows us to have tumbling windows defined by a length (timedelta), and we configure it to have windows of 5 seconds each.

So the old reduce_epoch:

flow.reduce_epoch(add)

becomes reduce_window:

from bytewax.window import SystemClockConfig, TumblingWindow


clock_config = SystemClockConfig()
window_config = TumblingWindow(length=timedelta(seconds=5))
flow.reduce_window("sum", clock_config, window_config, add)

Output and execution

Similarly to the input, the output configuration is now part of an operator, capture. Rather than collecting the output in a python iterator and then manually printing it, we can now configure the capture operator to print to standard output.

Since all the input and output handling is now defined inside the Dataflow, we don't need to pass this information to the execution method.

So we move from this:

flow.capture()

for epoch, item in run(flow, file_input()):
    print(item)

To this:

from bytewax.outputs import StdOutputConfig


flow.capture(StdOutputConfig())

run_main(flow)

Complete code

The complete code for the new simple example now looks like this:

import operator
import re

from datetime import timedelta, datetime

from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.execution import run_main
from bytewax.window import SystemClockConfig, TumblingWindow


def input_builder(worker_index, worker_count, resume_state):
    state = None  # ignore recovery
    for line in open("wordcount.txt"):
        yield state, line


def lower(line):
    return line.lower()


def tokenize(line):
    return re.findall(r'[^\s!,.?":;0-9]+', line)


def initial_count(word):
    return word, 1


def add(count1, count2):
    return count1 + count2


clock_config = SystemClockConfig()
window_config = TumblingWindow(length=timedelta(seconds=5))

flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)
flow.capture(StdOutputConfig())

run_main(flow)
In this article