Agentic Streaming Pipelines: How to package your Bytewax Dataflows to be used by an LLM
The introduction of large language models (LLMs) has revolutionized how we interact with technology. Through API calls, we can have conversations with these models and even write and execute code - simply by making API calls! One of the ways developers leverage LLMs in more complex systems is through "agents". While the concept of an agent may seem complex, at its core, it refers to any system that uses an LLM’s output to perform tasks on a user’s behalf, rather than merely repeating it back. In this blog, we will focus on the agentic design pattern "tool calling" and will provide practical examples to help you package your Bytewax dataflows so they can be used as tools by an LLM.
Understanding agents and agentic applications
An agent is a system or program that utilizes a large language model (LLM) to perform tasks or make decisions on behalf of a user, rather than simply responding with the generated text. In other words, an agent takes the responses from an LLM and uses them to trigger actions, execute code, or interact with external tools or systems to accomplish a goal or fulfill a request. Agents go beyond basic chatbot functionality by using LLM-generated outputs to control real-world processes or execute tasks in a more dynamic and functional manner.
An agentic application refers to a software application or system built around the concept of an agent. It involves the integration of LLM-powered agents that can interact with external tools, APIs, or data systems to carry out tasks on behalf of the user. In an agentic application, the LLM isn’t just generating text; it’s actively participating in decision-making and action execution, often by selecting and using prebuilt tools or invoking specific functions to address user queries or needs. These applications rely on the capability of agents to autonomously perform tasks, enhancing efficiency and automating complex workflows.
Successful agents rely on two key types of reasoning: evaluation and planning, and tool use. Evaluation and planning involve breaking down tasks iteratively, using techniques like Chain-of-Thought and ReAct to adjust approaches as needed. Tool use focuses on the agent's ability to effectively interact with its environment by selecting and executing the right tools to retrieve data, run code, or call APIs, with an emphasis on proper execution rather than reflecting on the results. Let's dive into function calling.
An introduction to function calling
Function calling is a fundamental concept in programming where a program invokes a specific function to execute a defined task. In the context of agent-based systems using large language models (LLMs), function calling allows an agent to trigger predefined operations or access external resources, such as databases or APIs, based on the model’s output. This enables agents to perform tasks autonomously, beyond simply generating text, by leveraging tools and functions to interact with the environment and execute complex workflows. Function calling serves as the bridge between the model’s reasoning capabilities and real-world actions.
As an example, OpenAI enables function calling through their API endpoints. Here is an example from their documentation.
Let's break it down.
from openai import OpenAI
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
},
},
}
]
completion = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "What's the weather like in Paris today?"}],
tools=tools,
)
print(completion.choices[0].message.tool_calls)
In this example get_weather
is a user defined function that takes as input a location
as a string. The function can be defined in a variety of ways, for instance by making an API call to a service that returns weather information.
def get_weather(location: str):
# define code to get weather given a location name
pass
The function can then be defined as a tool
using a JSON schema specifying the name of the function and the parameters it requires. Using OpenAI's chat.completions
entrypoint from their OpenAI
client, the LLM can receive as input a question in natural language such as "what is the weather like in Paris today".
[
{
"id": "call_12345xyz",
"type": "function",
"function": { "name": "get_weather", "arguments": "{'location':'Paris'}" }
}
]
When we run completion.choices[0].message.tool_calls
this returns a list containing the parameters that can be used to call the function. We as a user are responsible for how the function is then executed.
Source: https://platform.openai.com/docs/guides/function-calling
We'll take a look at two frameworks that enable function execution in addition to function calling: LangChain and Haystack.
Enabling function execution in Python
In this section we will study the similarities and differences between two frameworks that enable tool calling and tool execution: LangChain and Haystack by deepset. To enable agentic applications through tool calling they both require:
- Define a function
- Define a function schema
Both LangChain and Haystack refer to functions used by LLMs as "tools" - in this article we will use these terms interchangeably. The schemas are in a sense a way to annotate and describe that the tool does, the arguments it takes and the type of data structure of the arguments passed.
In LangChain, tool schemas can be passed as Python functions (where the schema includes typehinting and docstrings), Pydantic models, TypedDict classes or LangChain tool Objects.
In Haystack, tool schemas include typehinting and docstrings, and incorporate a tool definition that adheres to OpenAI's JSON tool definition schema.
As a simple example, here is a function definition with typehinting and a docstring:
def add(a: int, b: int) -> int:
"""Add two integers.
Args:
a: First integer
b: Second integer
"""
return a + b
We can then define ouradd
function as a tool. In LangChain, this is done through the bind_tools
method associated with a LangChain call of a model such as OpenAI. In Haystack this is done by defining the function as a Tool
object and using a ToolInvoker
to execute the tools when prompted. Let's now look at how we can package a Bytewax dataflow for function calling. We'll focus on a Haystack implementation but note that the packaging can be used in either framework.
Packaging a Bytewax dataflow for function calling
A Bytewax dataflow is a directed acyclic graph whose nodes consist of operators. The dataflow is initialized by the input operator - this first step enables reading the data from a source such as a file, a database or a distributed data streaming technology such as Kafka. Next operators are applied. Each operator is a node in the DAG applying some operation on the data such as data filtering, merging, or aggregation. Once the data has been processed it can be stored or served through the use of sink operators.
This is what a simple dataflow looks like. In this dataflow we are taking a batch of dictionaries with keys "id"
and "name"
, populating them into a DuckDB instance. Here is more information on our DuckDB output operator, To learn more about input and output operators supported at this time, refer to our blog on modules.
import bytewax.duckdb.operators as duck_op
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import run_main
flow = Dataflow("duckdb")
def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
return ("1", {"id": value, "name": "Alice"})
inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)
duck_op.output(
"out",
dict_stream,
"sample.duckdb",
"example_table",
"CREATE TABLE IF NOT EXISTS example_table (id INTEGER, name TEXT)",
)
run_main(flow)
We can package out dataflow into a function that takes, as an example, the DuckDB file name and table name:
def populate_duck_db(duck_db_instance: str, table_name: str):
"""
This function populates a DuckDB instance with a table containing
the numbers 0 through 49 and the name Alice
Args:
duck_db_instance: Name of duckdb file to populate
table_name: table name where the data will be stored
"""
flow = Dataflow("duckdb")
def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
return ("1", {"id": value, "name": "Alice"})
inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)
duck_op.output(
"out",
dict_stream,
duck_db_instance,
table_name,
f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER, name TEXT)",
)
run_main(flow)
We can further define a schema for it as follows (if using OpenAI or Haystack):
parameters={
"type": "object",
"properties": {
"duck_db_instance": {
"type": "string",
"description": "Name of duckdb file to populate"
},
"table_name": {
"type": "string",
"description": "table name where the data will be stored"
}
}
}
That's it! Let's look at an example using multiple dataflows as tools.
Mini example
Here is a Colab gist you can try.
In this example we are processing smoothie orders and have created two dataflows:
- One dataflow will enrich the data by adding prices and computing the total revenue per type of smoothie
- A second dataflow that will identify gaps of a given duration (in seconds) between orders placed.
Each of the dataflows is defined as a tool, along with a schema. We can then provide instructions in natural language and the LLM will execute the appropriate tool.
Tool Definition
total_revenue_per_order_tool = Tool(name='enrich_order_dataflow',
description="Enrich smoothie order data and compute total revenue per smoothie type with a given tax rate",
function=enrich_prices_dataflow,
parameters={
"type": "object",
"properties": {
"file_name": {
"type": "string",
"description": "Name of data file with smoothie orders to enrich"
},
"tax_rate": {
"type": "number",
"description": "Tax rate to apply to total revenue as a decimal number"
}
}
})
session_windowing_tool = Tool(name='session_windowing_dataflow',
description='Apply session window to a stream of smoothie orders to identify gaps of a given duration',
function=session_windowing_dataflow,
parameters={
"type": "object",
"properties": {
"file_name": {
"type": "string",
"description": "Name of data file with smoothie orders to process"
},
"session_gap": {
"type": "number",
"description": "Gap duration in between orders. The gap is specified in seconds"
}
}
}
)
Adding the tools to our OpenAI Instance
from haystack_experimental.components.generators.chat import OpenAIChatGenerator
chat_generator = OpenAIChatGenerator(model='gpt-4o-mini',
api_key=Secret.from_token(OPENAI_API_KEY),
tools=[total_revenue_per_order_tool, session_windowing_tool]
)
Asking a question in natural language
message = "Identify gaps in the order with a duration of 7 minutes in smoothie_orders.csv"
res = chat_generator.run([ChatMessage.from_user(message)], tools=[total_revenue_per_order_tool,\
session_windowing_tool])
Invoking the tools and execute the dataflow (the LLM chooses the appropriate tool to run)
from haystack_experimental.components.tools import ToolInvoker
tool_invoker = ToolInvoker(tools=[total_revenue_per_order_tool,\
session_windowing_tool], raise_on_failure=False)
tool_invoker.run(res['replies'])
windowing_operators_examples.output: 'Session 0 (08:00 - 08:15): Order ID [1, 2, 3, 4]'
windowing_operators_examples.output: 'Session 1 (08:23 - 08:23): Order ID [5]'
windowing_operators_examples.output: 'Session 2 (08:31 - 09:28): Order ID [6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]'
windowing_operators_examples.output: 'Session 3 (09:36 - 10:20): Order ID [19, 20, 21, 22, 23, 24, 25, 26, 27]'
windowing_operators_examples.output: 'Session 4 (10:28 - 10:33): Order ID [28, 29]'
windowing_operators_examples.output: 'Session 5 (10:41 - 10:48): Order ID [30, 31]'
windowing_operators_examples.output: 'Session 6 (10:56 - 11:28): Order ID [32, 33, 34, 35, 36, 37, 38]'
windowing_operators_examples.output: 'Session 7 (11:36 - 11:48): Order ID [39, 40, 41]'
windowing_operators_examples.output: 'Session 8 (11:56 - 12:29): Order ID [42, 43, 44, 45, 46, 47, 48, 49, 50]'
{'tool_messages': [ChatMessage(_role=<ChatRole.TOOL: 'tool'>, _content=[ToolCallResult(result='None', origin=ToolCall(tool_name='session_windowing_dataflow', arguments={'file_name': 'smoothie_orders.csv', 'session_gap': 420}, id='call_LychKF4EgCkESZwkielubLjZ'), error=False)], _meta={})]}
Let's ask another question
message = "Total revenue in smoothie orders in smoothie_orders.csv"
res = chat_generator.run([ChatMessage.from_user(message)], tools=[total_revenue_per_order_tool,\
session_windowing_tool])
tool_invoker.run(res['replies'])
The file 'smoothie_orders.csv' exists.
init_smoothie.total-revenue: ('Berry Blast', 49.06)
init_smoothie.total-revenue: ('Citrus Zing', 75.49)
init_smoothie.total-revenue: ('Green Machine', 51.75)
init_smoothie.total-revenue: ('Mocha Madness', 69.03)
init_smoothie.total-revenue: ('Morning Glow', 5.93)
init_smoothie.total-revenue: ('Nutty Delight', 18.34)
init_smoothie.total-revenue: ('Protein Power', 87.38)
init_smoothie.total-revenue: ('Tropical Twist', 40.45)
{'tool_messages': [ChatMessage(_role=<ChatRole.TOOL: 'tool'>, _content=[ToolCallResult(result='None', origin=ToolCall(tool_name='enrich_order_dataflow', arguments={'file_name': 'smoothie_orders.csv', 'tax_rate': 0.08}, id='call_Rs3L3ko72HFIjSmu8NLeLSAd'), error=False)], _meta={})]}
Conclusion
In this post, we've explored how to package Bytewax dataflows into function calls that can be leveraged by LLM-powered agents. By defining functions and tool schemas, we enable agents to execute complex workflows autonomously, enriching data and processing streams based on user queries. We also demonstrated how frameworks like LangChain and Haystack facilitate function calling and execution, helping agents interact with external resources effectively. With the example of smoothie order processing, we showed how agents can use multiple dataflows as tools to carry out tasks like identifying session gaps and calculating revenue. By integrating these capabilities, we can build powerful agentic applications that automate data-driven processes and make intelligent, real-time decisions.
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.