A simple example

Now that we've installed bytewax, let's begin with an end-to-end example. We'll start by building out a simple dataflow that performs count of words in a file.

To begin, save a copy of this text in a file called wordcount.txt:

To be, or not to be, that is the question:
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
And by opposing end them.

And a copy of the code in a file called wordcount.py.

import operator
import re

from datetime import timedelta, datetime

from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.execution import run_main
from bytewax.window import SystemClockConfig, TumblingWindowConfig


def input_builder(worker_index, worker_count, resume_state):
    state = None # ignore recovery
    for line in open("wordcount.txt"):
        yield state, line


def lower(line):
    return line.lower()


def tokenize(line):
    return re.findall(r'[^\s!,.?":;0-9]+', line)


def initial_count(word):
    return word, 1
    
    
def add(count1, count2):
    return count1 + count2

clock_config = SystemClockConfig()
window_config = TumblingWindowConfig(length=timedelta(seconds=5))

flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)
flow.capture(StdOutputConfig())

run_main(flow)

Running the example

Now that we have our program and our input, we can run our example via python ./wordcount.py and see the completed result:

("'tis", 1)
('a', 1)
('against', 1)
('and', 2)
('arms', 1)
('arrows', 1)
('be', 2)
('by', 1)
('end', 1)
('fortune', 1)
('in', 1)
('is', 1)
('mind', 1)
('nobler', 1)
('not', 1)
('of', 2)
('opposing', 1)
('or', 2)
('outrageous', 1)
('question', 1)
('sea', 1)
('slings', 1)
('suffer', 1)
('take', 1)
('that', 1)
('the', 3)
('them', 1)
('to', 4)
('troubles', 1)
('whether', 1)

Unpacking the program

Now that we've run our first bytewax program, let's walk through the components that we used.

In a dataflow program, each step added to the flow will occur in the order that it is added. For our wordcount dataflow, we'll want the following steps:

  • Take a line from the file
  • Lowercase all characters in the line
  • Split the line into words
  • Count the occurrence of each word in the file
  • Print out the result after all the lines have been processed

We'll start with how to get input we'll push through our dataflow.

Take a line from the file

def input_builder(worker_index, worker_count, resume_state):
    # resume_state is used for recovery, we can ignore it for now
    resume_state = None
    for line in open("wordcount.txt"):
        yield resume_state, line

flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))

To receive input, our program needs an input iterator. We've defined a Python generator that will read our input file. There are also more advanced ways to provide input, which you can read about later when we talk about execution modes.

This generator yields two-tuples of resume_state and a line from our file. The resume_state in this example is significant, but we'll talk more about it when we discuss recovery.

We then initialize a ManualInputConfig with the input_builder function. Now we can introduce our first operator, input, that will generate the data for the flow.

Let's define the steps that we want to execute for each line of input that we receive. We will add these steps to a dataflow object, bytewax.dataflow.Dataflow().

Lowercase all characters in the line

If you look closely at our input, we have instances of both To and to. Let's add a step to our dataflow that transforms each line into lowercase letters. At the same time, we'll introduce the map operator.

def lower(line):
    return line.lower()


flow.map(lower)

For each item that our generator produces, the map operator will use the built-in string function lower() to emit downstream a copy of the string with all characters converted to lowercase.

Split the line into words

When our input_builder() function is called, it will receive an entire line from our file. In order to count the words in the file, we'll need to break that line up into individual words.

Enter our tokenize() function, which uses a Python regular expression to split the line of input into a list of words:

def tokenize(line):
    return re.findall(r'[^\s!,.?":;0-9]+', line)

For example,

to_be = "To be, or not to be, that is the question:"
print(tokenize(to_be))

results in:

['To', 'be', 'or', 'not', 'to', 'be', 'that', 'is', 'the', 'question']

To make use of tokenize function, we'll use the flat map operator:

flow.flat_map(tokenize)

The flat map operator defines a step which calls a function on each input item. Each word in the list we return from our function will then be emitted downstream individually.

Build up counts

At this point in the dataflow, the items of data are the individual words.

Let's skip ahead to the second operator here, reduce window.

def initial_count(word):
    return word, 1
    
    
def add(count1, count2):
    return count1 + count2

# Configuration for time based windows.
clock_config = SystemClockConfig()
window_config = TumblingWindowConfig(length=timedelta(seconds=5))
    
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)

Its super power is that it can repeatedly combine together items into a single, aggregate value via a reducing function. Think about it like reducing a sauce while cooking: you are boiling all of the values down to something more concentrated.

In this case, we pass it the reducing function add() which will sum together the counts of words so that the final aggregator value is the total.

How does reduce_window know which items to combine? Part of its requirements are that the input items from the previous step in the dataflow are (key, value) two-tuples, and it will make sure that all values for a given key are passed to the reducing function. Thus, if we make the word the key, we'll be able to get separate counts!

That explains the previous map step in the dataflow with initial_count().

This map sets up the shape that reduce_window needs: two-tuples where the key is the word, and the value is something we can add together. In this case, since we have a copy of a word for each instance, it represents that we should add 1 to the total count, so label that here.

How does reduce_window know when to emit combined items? That is what clock_config and window_config are for. SystemClockConfig is used to synchronize the flow's clock to the system one. TumblingWindowConfig instructs the flow to close windows every length period, 5 seconds in our case. reduce_window will emit the accumulated value every 5 seconds, and once the input is completely consumed.

The last part of our dataflow program will use the capture operator to mark the output of our reduction as the dataflow's final output.

flow.capture(StdOutputConfig())

This means that whatever items are flowing through this point in the dataflow will be passed on as output. We use StdOutputConfig to route our output to the system's standard output.

Running

To run the example, we'll need to introduce one more function, bytewax.execution.run_main():

run_main(flow)

When we call run_main(), our dataflow program will begin running, Bytewax will read the input items and epoch from our input generator, push the data through each step in the dataflow, and return the captured output. We then print the output of the final step.

Here is the complete output when running the example:

('opposing', 1)
('and', 2)
('of', 2)
('end', 1)
('whether', 1)
('arrows', 1)
('that', 1)
('them', 1)
('not', 1)
('by', 1)
('sea', 1)
('arms', 1)
('a', 1)
('is', 1)
('against', 1)
('to', 4)
("'tis", 1)
('nobler', 1)
('take', 1)
('question', 1)
('troubles', 1)
('or', 2)
('slings', 1)
('mind', 1)
('outrageous', 1)
('suffer', 1)
('be', 2)
('in', 1)
('the', 3)
('fortune', 1)

To learn more about possible modes of execution, read our page on execution.

In this article