Building High-Performance APIs with Haystack, Bytewax and FastAPI: A RAG app study case

By Laura Funderburk

Retrieval-Augmented Generation (RAG) is gaining prominence as a method to enhance the capabilities of large language models by incorporating external data. While RAG workflows are typically associated with offline batch processing or single-query tools, there is a growing need to make these workflows available as scalable APIs for real-time applications.

This article explores how to build high-performance APIs for RAG workflows using Bytewax and FastAPI. Bytewax, with its distributed dataflow capabilities, excels in processing streaming data, making it a strong choice for indexing operations in RAG applications. Meanwhile, FastAPI provides a robust and easy-to-use framework for designing RESTful APIs. Combining these tools allows you to package a RAG pipeline into an efficient, queryable API endpoint that can handle complex data processing and querying in real time.

In this case study, we’ll guide you through implementing such a solution, including data ingestion with Bytewax, in-memory storage for low-latency retrieval, and FastAPI for exposing endpoints. By the end, you will have a clear framework for integrating RAG pipelines into your application stack with performance and scalability in mind.

Introduction

Retrieval-Augmented Generation (RAG) workflows have emerged as a powerful approach for enhancing language models with external data. By combining retrieval mechanisms with generative AI, RAG enables applications to ground their responses in real-time, contextually relevant information. This capability is critical for applications like chatbots, question-answering systems, and document summarization, where relevance and accuracy are paramount.

Despite its promise, operationalizing RAG workflows as APIs presents challenges. These include efficiently indexing large datasets, ensuring low-latency retrieval, and maintaining scalability under variable workloads. Traditional batch-processing methods often fall short when real-time responsiveness is required, necessitating a more dynamic and scalable approach.

This is where Bytewax and FastAPI come in. Bytewax offers a distributed dataflow framework that simplifies complex data processing tasks, making it an excellent choice for indexing in RAG pipelines. FastAPI, on the other hand, provides a performant and developer-friendly framework for building and exposing RESTful APIs. By combining these tools, you can create high-performance APIs that bring the power of RAG workflows to real-time applications.

Understanding the Components

What is Haystack?

Haystack is an open-source framework designed to facilitate the development of search systems and Retrieval-Augmented Generation (RAG) workflows. It provides tools to create robust, flexible pipelines for tasks such as document retrieval, question answering, summarization, and conversational AI. Haystack is particularly suited for integrating external data into AI-driven applications, enhancing their accuracy and relevance.

Haystack is a versatile framework for building modular pipelines tailored for search systems and Retrieval-Augmented Generation (RAG) workflows. It supports a variety of document stores, such as Elasticsearch, OpenSearch, and vector-based systems like Weaviate, to manage knowledge bases. With tools for sparse and dense retrieval, it leverages embedding-based methods like DPR and SentenceTransformers.Its modular architecture allows seamless integration of components into customizable workflows, enabling the combination of retrievers, readers, and preprocessors. Additionally, Haystack integrates with large language models (LLMs) to enhance RAG capabilities.

What is Bytewax?

Bytewax is a Python-based framework for distributed data processing. Designed to handle real-time and stateful dataflows, Bytewax excels in scenarios that require parallel processing and low-latency data handling. Its programming model is inspired by Apache Beam and Timely Dataflow, making it suitable for use cases like data transformation, stream processing, and indexing.

In the context of RAG pipelines, Bytewax can be used to process and index incoming data streams efficiently. By leveraging its distributed architecture, you can ensure that indexing scales with your data volume and remains performant even under high loads. Its ability to maintain state across workflows is particularly useful for dynamic indexing and processing tasks required in RAG systems.

What is FastAPI?

FastAPI is a modern, fast (hence the name), and highly intuitive framework for building APIs with Python. Built on top of Starlette for the web layer and Pydantic for data validation, FastAPI combines ease of use with high performance. It natively supports asynchronous programming, allowing developers to build APIs that can handle a large number of simultaneous requests.

For RAG applications, FastAPI serves as the bridge between users and the underlying data processing pipelines. Its simplicity makes it easy to design and expose endpoints for tasks like indexing and querying. FastAPI’s built-in features for input validation, error handling, and documentation generation further streamline API development.

RAG Pipelines in Context

RAG pipelines enhance large language models by providing them with access to external data sources, improving the relevance and factuality of their outputs. These pipelines typically consist of two main components:

  • Indexing: Responsible for chunking and embedding information.
  • Retriever: Responsible for fetching relevant information from a knowledge base or dataset.

A robust RAG workflow requires both efficient indexing of the knowledge base and fast, contextually aware querying. In real-time applications, this involves continuous updates to the knowledge base and the ability to handle dynamic queries. Challenges arise in ensuring that both components are performant and that the workflow remains scalable as data volume and query complexity increase.

By combining Bytewax for data processing and FastAPI for API delivery, you can create a system that efficiently indexes and retrieves data while exposing these capabilities through a user-friendly API. This integration allows developers to operationalize RAG pipelines as scalable and responsive APIs, unlocking their potential for real-world applications.

Case study introduction

We will be working with the following dataset. The dataset consists of news articles focusing on financial updates, such as analyst ratings, price target adjustments, and market movements, each tagged with relevant stock symbols. For example, one article discusses Piper Sandler's updated price target for Virtus Investment Partners (NYSE: VRTS), while another highlights Faraday Future's strategy announcements (FFIE). To enhance data relevance and usability, filters like stock symbols or specific timeframes can be applied to extract and embed only the most pertinent information, ensuring the system delivers tailored insights efficiently.

Key Characteristics of the Dataset

  • Time-Stamped Data: created_at and updated_at enable chronological tracking of news updates.
  • Stock-Specific Focus: The symbols field connects news articles to particular stocks, making it ideal for targeted financial analysis.
  • Rich Content: The headline, summary, and content fields provide varying levels of detail, suitable for different use cases like quick alerts or in-depth analysis.
  • URL Integration: The url field allows users to access the full article for further context or verification.
  • Structured for Flexibility: The dataset’s format supports filtering by attributes like symbols, timestamps, or content presence, enabling customized data retrieval and analysis.

Implementing Indexing and Querying pipelines with Haystack

Indexing pipeline

The BenzingaEmbedder class defines a pipeline for processing and embedding news data using Haystack components. Its primary purpose is to take raw data, clean and preprocess it, generate embeddings, and store the results in a document store. Here’s a breakdown of its functionality:

The constructor (__init__) sets up the pipeline with several components:

  • BenzingaNews: Fetches or parses raw news data.
  • DocumentCleaner: Removes empty lines, extra whitespaces, and redundant substrings from the content.
  • DocumentSplitter: Splits cleaned text into smaller passages of a specified length.
  • OpenAIDocumentEmbedder: Generates embeddings for the processed text using OpenAI’s API.
  • DocumentWriter: Stores the processed and embedded documents in a document store with overwrite capability.

The pipeline connects these components in a logical sequence to ensure smooth data flow.

@component
class BenzingaEmbedder:
    
    def __init__(self, document_store, open_ai_key):
        logger.info("Initializing BenzingaEmbeder pipeline.")
        try:
            get_news = BenzingaNews()
            document_cleaner = DocumentCleaner(
                                remove_empty_lines=True,
                                remove_extra_whitespaces=True,
                                remove_repeated_substrings=False
                            )
            document_splitter = DocumentSplitter(split_by="passage", split_length=5)
            document_writer = DocumentWriter(document_store=document_store,
                                             policy=DuplicatePolicy.OVERWRITE)
            embedding = OpenAIDocumentEmbedder(api_key=Secret.from_token(open_ai_key))

            self.pipeline = Pipeline()
            self.pipeline.add_component("get_news", get_news)
            self.pipeline.add_component("document_cleaner", document_cleaner)
            self.pipeline.add_component("document_splitter", document_splitter)
            self.pipeline.add_component("embedding", embedding)
            self.pipeline.add_component("document_writer", document_writer)

            self.pipeline.connect("get_news", "document_cleaner")
            self.pipeline.connect("document_cleaner", "document_splitter")
            self.pipeline.connect("document_splitter", "embedding")
            self.pipeline.connect("embedding", "document_writer")

            logger.info("Pipeline initialized successfully.")
        except Exception as e:
            logger.error(f"Error during BenzingaEmbeder initialization: {e}")
            raise

    @component.output_types(documents=List[Document])
    def run(self, event: List[Union[str, Path, ByteStream]]):
        logger.info(f"Running BenzingaEmbeder with event: {event}")
        try:
            
            documents = self.pipeline.run({"get_news": {"sources": [event]}})
            self.pipeline.draw("benzinga_pipeline.png")
            logger.info("Pipeline executed successfully, drawing pipeline graph.")
            return documents
        except Exception as e:
            logger.error(f"Error during pipeline execution: {e}")
            raise

This component is designed for scenarios requiring systematic processing of textual data, such as financial news articles, to generate embeddings for downstream tasks like semantic search or question-answering systems.

Querying pipeline

The RetrieveDocuments class defines a query pipeline for retrieving relevant documents from a document store using embeddings and extractive reading. It is initialized with three main components:

  • Text Embedder: Converts the query into a vector embedding using OpenAI’s API.
  • Retriever: Finds relevant documents from an in-memory document store based on the query embedding.
  • Reader: Extracts precise answers or relevant content from the retrieved documents.

These components are connected in a logical flow, where the embedder generates the query embedding, the retriever fetches matching documents, and the reader analyzes the results to provide detailed responses.

The run method takes a query and optional symbols, processes the query through the pipeline, and returns the result.

class RetrieveDocuments:
    def __init__(self, doc_store, open_ai_key):
        # Initialize components
        text_embedder = OpenAITextEmbedder(api_key=Secret.from_token(open_ai_key))
        retriever = InMemoryEmbeddingRetriever(document_store=doc_store)
        reader = ExtractiveReader()
        reader.warm_up()
        # Build the pipeline
        self.query_pipeline = Pipeline()
        self.query_pipeline.add_component("embedder",text_embedder)
        self.query_pipeline.add_component("retriever", retriever)
        self.query_pipeline.add_component("reader", reader)

        # Connect components
        self.query_pipeline.connect("embedder.embedding", "retriever.query_embedding")
        self.query_pipeline.connect("retriever.documents", "reader.documents") 

    def run(self, query, symbols):
    
        logger.info(f"Running query pipeline with query: {query}")
        self.query_pipeline.draw("query_pipeline.png")
        # Pass query through the pipeline
        response = self.query_pipeline.run(
            data={"embedder": {"text": query}, 
                  "retriever": {"top_k": 3}, 
                  "reader": {"query": query, "top_k": 2}}
        )
        logger.info(f"Response: {response}")
        return response #["llm"]["replies"][0]

This class enables efficient semantic search and question-answering workflows in applications that leverage RAG techniques.

Building an Indexing dataflow with Bytewax and Haystack

The run_pipeline_with_symbol function defines and executes a data processing pipeline for embedding and storing filtered news data associated with a specific symbol. Here's how it works:

  1. Initialization: It initializes a BenzingaEmbedder instance with a document store and an OpenAI API key. This embedder handles the preprocessing, embedding, and storage of documents.
  2. Event Processing: A nested process_event function wraps the embedding process. It takes an event, runs it through the embedder, and returns the processed document.
  3. Pipeline Definition: A Dataflow named "rag-pipeline" is created to define the pipeline stages:
    • Input: Reads data from a JSONL file (news_out.jsonl) using FileSource.
    • Deserialization: Converts raw input data into structured JSON objects using safe_deserialize.
    • Filtering: Filters events to include only those relevant to the specified symbol, using a lambda function and the filter_data helper.
    • Embedding: Processes filtered data through the process_event function to generate document embeddings.
    • Output: Writes the processed embeddings to the standard output (StdOutSink).

The pipeline processes the input data, filters for relevant events, generates embeddings, and outputs the results. The Dataflow object is returned for execution.

def run_pipeline_with_symbol(symbol, document_store, open_ai_key):
    embed_benzinga = BenzingaEmbedder(document_store, open_ai_key)
    
    def process_event(event):
        """Wrapper to handle the processing of each event."""
        if event:
            document = embed_benzinga.run(event)
            return document
        return None
    
    flow = Dataflow("rag-pipeline")
    input_data = op.input("input", flow, FileSource("news_out.jsonl"))
    deserialize_data = op.map("deserialize", input_data, safe_deserialize)
    
    # Use a lambda to pass the symbol to the filter_data function
    filtered_data = op.filter("filter_data", deserialize_data, lambda event: filter_data(event, symbol))
    embed_data = op.map("embed_data", filtered_data, process_event)
    op.output("output", embed_data, StdOutSink())
    return flow

This function encapsulates a streamlined workflow for dynamically filtering, embedding, and storing data tied to a specific symbol, making it ideal for real-time or batch processing in retrieval-augmented generation (RAG) pipelines.

Implementing the API

This solution integrates FastAPI, Bytewax, and Haystack to build a high-performance API for real-time data indexing and retrieval in Retrieval-Augmented Generation (RAG) workflows. It processes symbols (e.g., financial tickers) and stores the data in memory for efficient querying, supporting use cases like dynamic data augmentation for language models.

Key Components and Workflow:

The API is defined using FastAPI, providing endpoints for querying data (/query/) and health checks (/). The /query/ endpoint accepts a list of symbols to index and a natural language query. Input validation is handled with Pydantic, and clear HTTP error responses are provided for invalid inputs.

Data ingestion is powered by Bytewax, a distributed dataflow framework that processes each symbol concurrently using asyncio. Bytewax dynamically populates an InMemoryDocumentStore, which serves as temporary, low-latency storage for the retrieved data.

Once indexing is complete, Haystack’s retrieval pipeline is used to process the user’s query. This pipeline employs embedding-based retrieval methods to fetch relevant information and return a response. Combined with Bytewax’s scalability and FastAPI’s asynchronous capabilities, the solution ensures efficient, real-time processing.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from bytewax.run import cli_main
from haystack.document_stores.in_memory import InMemoryDocumentStore
import asyncio
import logging
import os

from indexing_dataflow import run_pipeline_with_symbol 
from querying import RetrieveDocuments  

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI()

# OpenAI API key from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# Global tasks dictionary to track active Bytewax flows
active_tasks = {}

# Request model for query
class QueryRequest(BaseModel):
    symbols: str  # Comma-separated list of symbols
    question: str  # Natural language query


@app.post("/query/")
async def process_query(request: QueryRequest):
    """
    Index symbols using Bytewax and query the in-memory document store.
    """
    # Parse symbols
    symbols = [symbol.strip() for symbol in request.symbols.split(",") if symbol.strip()]
    if not symbols:
        raise HTTPException(status_code=400, detail="At least one symbol must be provided.")

    # Create a new in-memory document store for this session
    document_store = InMemoryDocumentStore()

    # Start Bytewax dataflow for each symbol
    def run_bytewax(symbol):
        """Run Bytewax for a given symbol."""
        try:
            flow = run_pipeline_with_symbol(symbol, document_store, OPENAI_API_KEY)
            cli_main(flow)
        except Exception as e:
            logger.error(f"Error in Bytewax flow for symbol '{symbol}': {e}")
            raise

    # Run Bytewax flows concurrently for all symbols
    tasks = [asyncio.to_thread(run_bytewax, symbol) for symbol in symbols]
    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error running Bytewax dataflow: {e}")

    # Initialize query pipeline with the populated document store
    query_pipeline = RetrieveDocuments(document_store, OPENAI_API_KEY)

    # Use the pipeline to query
    try:
        response = query_pipeline.run(request.question, symbols)
        return {"answer": response}
    except Exception as e:
        logger.error(f"Error querying the pipeline: {e}")
        raise HTTPException(status_code=500, detail="Error querying the pipeline.")

@app.get("/")
def health_check():
    """Health check endpoint."""
    return {"status": "API is running"}

The solution provides real-time indexing and querying, low-latency responses, and a modular architecture that can be easily extended for various RAG use cases. It’s ideal for applications requiring dynamic data processing, such as financial data retrieval, personalized recommendations, or augmented AI workflows.

Summary

In this article, we explored how we can integrate Haystack, Bytewax and FastAPI to build a high-performance API for real time data processing and retrieval in a RAG workflow. Key workflows include indexing news data by processing symbols dynamically through Bytewax and embedding the content for fast retrieval using Haystack.

Queries are processed via an optimized pipeline that generates embeddings, retrieves relevant documents, and provides precise responses. Robust error handling, scalability, and monitoring with tools like Prometheus ensure the solution is reliable and production-ready.

By leveraging the strengths of these tools, we can create a modular, efficient, and scalable RAG pipeline as an API.

This approach is ideal for use cases like financial data retrieval, real-time content augmentation, and intelligent search systems.

References

This article contains code snippets from Laura Funderburk's book "Building Natural Language Pipelines" published by Packt. https://github.com/PacktPublishing/Building-Natural-Language-Pipelines

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post

Laura Funderburk

Senior Developer Advocate
Laura Funderburk holds a B.Sc. in Mathematics from Simon Fraser University and has extensive work experience as a data scientist. She is passionate about leveraging open source for MLOps and DataOps and is dedicated to outreach and education.
Next post