Execution

Bytewax allows a dataflow program to be run using multiple processes and/or threads, allowing you to scale your dataflow to take advantage of multiple cores on a single machine or multiple machines over the network.

A worker is a thread that is helping execute your dataflow. Workers can be grouped into separate processes, but refer to the individual threads within.

Bytewax's execution model uses identical workers. Workers execute all steps in a dataflow and automatically trade data to ensure the semantics of the operators. If a dataflow is run on multiple processes, there will be a slight overhead added to aggregating operators due to pickling and network communication, but it will allow you to read from input partitions in parallel for higher throughput.

Achieving the best performance of your dataflows requires considering the properties of your logic, input, and output. We'll give an overview and some hints here.

Bytewax gives you a script that can be used to run dataflows:

python -m bytewax.run --help

Selecting the dataflow

The first argument passed to the script is a dataflow getter string. The string is in the format <dataflow-module>:<dataflow-getter>.

  • <dataflow-module> points to a python module containing the dataflow definition
  • <dataflow-getter> is either the name of a python variable holding the flow, or a function call to a function defined in the module

Let's see two examples (we assume bytewax is correctly installed).

Write your dataflow to a file named ./simple.py:

# ./simple.py
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingInput
from bytewax.connectors.stdio import StdOutput

flow = Dataflow()
flow.input("inp", TestingInput(range(3)))
flow.map(lambda item: item + 1)
flow.output("out", StdOutput())

To run this flow, you should use simple:flow:

python -m bytewax.run simple:flow

In some cases you might want to change the Dataflow's behaviour without having to change the source code.

What if you want to test the previous Dataflow with different input ranges?

You can rewrite the file so that the Dataflow is built from a function, rather than being defined in the file:

# ./parametric.py
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingInput
from bytewax.connectors.stdio import StdOutput


def get_flow(input_range):
    flow = Dataflow()
    flow.input("inp", TestingInput(range(input_range)))
    flow.map(lambda item: item + 1)
    flow.output("out", StdOutput())
    return flow

And you can run it with:

$ python -m bytewax.run "parametric:get_flow(10)"

The Dataflow can run in different ways, depending on the arguments you pass to the entrypoint.

Let's explore them!

Single Worker Run

By default bytewax.run will run your dataflow on a single worker in the current process. This avoids the overhead of setting up communication between workers/processes, but the dataflow will not have any gain from parallelization.

As you saw in the previous chapter, you just need to pass the Dataflow getter string:

$ python -m bytewax.run simple:flow

Bytewax exposes the internal functions used to run the dataflow instance, but those should only be used in a testing/experimental setup:

# At the end of the file:
from bytewax.testing import run_main

run_main(flow)
1
2
3

Local Cluster

By changing the -p/--processes and -w/--workers-per-process arguments, you can spawn multiple processes, and mulitple workers per process, letting bytewax.run handle the communication between them.

For example you can run the previous dataflow with 2 processes, and 3 workers per process, for a total of 6 workers using the exact same file, changing only the command:

$ python -m bytewax.run -p2 -w3 simple:flow

Manually Handled Cluster

If you want to run single processes on possibly different machines on the same network, you can use the -i/--process-id,-a/--addresses parameters.

It allows you to start up a single process within a cluster of processes that you are manually coordinating. We recommend you checkout the documentation for waxctl our command line tool which facilitates running a dataflow on Kubernetes.

The -a/--addresses parameter represents a list of addresses for all the processes, separated by a ';'. When you run single processes separately, you need to assign a unique id to each process. The -i/--process-id should be a number starting from 0 representing the position of its respective address in the list passed to -a.

For example you want to run 2 processes, with 3 workers each, on two different machines. The machines are known in the network as cluster_one and cluster_two. You should run the first process on cluster_one as follows:

$ python -m bytewax.run simple:flow -w3 -i0 -a "cluster_one:2101;cluster_two:2101"

And on the cluster_two machine as:

$ python -m bytewax.run simple:flow -w3 -i1 -a "cluster_one:2101;cluster_two:2101"

This is only needed if you want to run the dataflow on multiple machines, or if you need better control of the addresses/ports used by default by the run script.

In this article