Extending Python API
Have you ever wanted to implement particular weird semantic with your operators? Or maybe support something tricky? Or you have a fever and the only prescription is more decorators? Well with this guide you can!
You can define new custom operators in terms of already existing
operators. To do this you define an operator function and
decorate it with
from bytewax.dataflow import Stream, operator
import bytewax.operators as op
def add_to(step_id: str, up: Stream[int], y: int) -> Stream[int]:
return op.map("shim_map", lambda x: x + y)
Each input or output
Stream turns into a
Port in the resulting
In order to generate the operator data model, and proper nesting of operators, you must follow a few rules when writing your function:
There must be a
step_id: strargument, even if not used.
All arguments, the return value, and return dataclass fields that are
MultiStreams must have type annotations. We recommend annotating all the arguments, the return value, and all fields in a return dataclass.
You must use a
dataclassto return multiple values.
Argument and return dataclass field names must not overlap with the names defined on the
Dataflows must not appear in nested objects: they either can be arguments, the return type directly, or the top-level fields of a dataclass that is the return type; nowhere else.
A good docstring for a custom operator has a few things:
A one line summary of the operator.
A doctest example using the operator.
Any arguments that are streams describe the required shape of that upstream.
The return streams describe the shape of the data that is being sent downstream.