Real-time content update detection in production with Bytewax

By Laura Funderburk

One of the challenges in data systems is managing the presence of data updates. A well-designed data system will incorporate unique identifiers (IDs) for each record, ensuring data integrity and accuracy. Identifying when an update has taken place is crucial for maintaining data consistency, triggering timely actions, and improving decision-making processes. Real-time content update detection enables organizations to respond promptly to changes, ensuring that their systems and users are always working with the most current information.

One example is the generation of vectors in a retrieval augmented generation (RAG) system. We explored previously how we can leverage Bytewax along with the Python ecoystem to build real-time RAG systems, but what happens when an entry is updated? How can we detect these instances in real time?

In this blog, we will outline how you can set up an alert mechanism for when a data update has taken place using Kafka, Python and Bytewax. By leveraging these tools, you can create a robust system that monitors and detects updates efficiently.

Problem statement

The challenge at hand is to efficiently ingest real-time news data from Alpaca's Benzinga news feed using a robust pipeline setup. The goal is to establish a system that can handle high-throughput data ingestion, detect updates, and process news articles in real-time. The primary focus is on integrating Alpaca news data into a streaming data pipeline, ensuring scalability and reliability for production environments while also providing a simplified local setup for development and testing purposes.

Real-time Data Ingestion

Key Objectives:

  1. Real-time Data Ingestion: Implement a real-time data ingestion pipeline to capture news articles as they are published via the Alpaca news websocket.
  2. Scalable Production Deployment: Use Kafka or Redpanda to handle data streaming in a production environment, ensuring the system can scale with increased data loads.
  3. Local Testing and Development: Provide an alternative local setup using JSON lines files to facilitate development and testing without the need for Kafka.
  4. Data Processing and Transformation: Process incoming news articles to extract relevant information and prepare them for downstream analysis and storage.
  5. Alert Mechanism for Updates: Develop a mechanism to detect and alert on updates to news articles within specific time windows, enhancing real-time monitoring and response capabilities.
  6. Deploy Workflows on Cloud Providers: Manage scripts and deploy them onto services like AWS, GCP and Azure.

In the following sections, we will describe how to set up these pipelines, configure the environment, and deploy the solution both locally and remotely using Bytewax.

Real-time Data Ingestion of News

Real-time data ingestion is a crucial aspect of modern data systems, enabling immediate access to fresh and relevant information. By setting up a robust pipeline for real-time data ingestion, organizations can ensure that their applications and services operate with the most current data available, facilitating timely decision-making and enhancing operational efficiency.

Introducing Bytewax

Bytewax is a Python-native data processing framework designed for building real-time processes . It leverages the concepts of stateful stream processing to handle complex data workflows ("dataflows") efficiently. Bytewax integrates seamlessly with various data sources and sinks, including Kafka, which is widely used for high-throughput, low-latency data streaming.

Kafka Connectors

Apache Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records. It is highly scalable and fault-tolerant, making it ideal for real-time data ingestion and processing. Bytewax provides connectors to Kafka, enabling seamless integration between your data processing logic and Kafka topics. This allows for efficient ingestion of data into Kafka and subsequent processing by Bytewax.

Managing Dataflows with waxctl

Waxctl helps you run and manage your Bytewax dataflows in Kubernetes. It uses the current kubectl context configuration, so you need kubectl configured to access the desired Kubernetes cluster for your dataflows.

Let's dive into building the system leveraging the tools outlined above.

Building a real time ingestion pipeline with Bytewax, Kafka and Python

You can find a complete implementation of a local Kafka instance and a Bytewax dataflow here.

Let's expand on the ingestion dataflow. In this script, we extract topics using the Alpaca entry point and populate a Kafka instance. In this script we perform the following key steps:

  1. Environment Setup: Setting up environment variables for the Alpaca API and Kafka brokers.
import json
import os
from dataclasses import dataclass, field
from datetime import timedelta
from typing import List, Dict

import websockets
from bytewax import operators as op
from bytewax.connectors.files import FileSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.kafka import KafkaSinkMessage

API_KEY = os.getenv("API_KEY")
API_SECRET = os.getenv("API_SECRET")

BROKERS = os.getenv("BROKER")
OUT_TOPIC = os.getenv("TOPIC_NEWS")
  1. News Aggregator: An asynchronous function, news_aggregator, connects to the Alpaca news websocket, authenticates using the provided API keys, and subscribes to the news feed. It continuously receives news articles and yields them for further processing.
async def news_aggregator(ticker):
    url = "wss://stream.data.alpaca.markets/v1beta1/news"
    async with websockets.connect(url) as websocket:
        await websocket.send(json.dumps({"action": "auth", 
                                         "key": API_KEY, 
                                         "secret": API_SECRET}))
        await websocket.recv()  # Ignore auth response
        await websocket.send(json.dumps({"action": "subscribe", 
                                         "news": [ticker]}))
        await websocket.recv()  # Ignore subscription response
        await websocket.recv()

        while True:
            message = await websocket.recv()
            articles = json.loads(message)
            yield articles
  1. News Partition: This class implements StatefulSourcePartition from Bytewax, using the news_aggregator to fetch batches of news articles. The pattern below enables fetching the next batch of data, snapshotting state (stateless in this case), and closing the connection.
class NewsPartition(StatefulSourcePartition):
    def __init__(self, ticker):
        self.ticker = ticker
        self.batcher = batch_async(news_aggregator(ticker), 
                                   timedelta(seconds=0.5), 100)

    def next_batch(self):
        return next(self.batcher)

    def snapshot(self):
        return None  # Stateless for now

    def close(self):
        pass  # The async context 
              # manager will handle closing 
              # the WebSocket.
  1. News Source: The NewsSource class inherits from FixedPartitionedSource and provides a list of tickers for partitioning the data stream. It builds partitions using the NewsPartition class.
@dataclass
class NewsSource(FixedPartitionedSource):
    tickers: List[str] = field(default_factory=lambda: ["*"])

    def list_parts(self):
        return self.tickers

    def build_part(self, step_id, for_key, _resume_state):
        return NewsPartition(for_key)
  1. Dataflow Setup: The Dataflow object is where streaming happens. We can initialize our dataflow and input as follows. We initialize the dataflow and name it "news_input". We can then use the input operator and our NewsSource to get news articles for any ticker.
ticker_list = ["*"]
flow = Dataflow("news_loader")
inp = op.input("news_input", 
               flow, 
               NewsSource(ticker_list)).then(op.flat_map, 
                                             "flatten", 
                                             lambda x: x)

We can apply another map operation to serialization and output: The script includes functions to serialize news articles either for Kafka or for writing to a JSON lines file.

def serialize_k(news)-> KafkaSinkMessage[Dict, Dict]:
    return KafkaSinkMessage(
        key=json.dumps(news['symbols'][0]),
        value=json.dumps(news),
    )

print(f"Connecting to brokers at: {BROKERS}, Topic: {OUT_TOPIC}")

serialized = op.map("serialize", inp, serialize_k)

broker_config = {
    "security_protocol":"SASL_SSL",
    "sasl_mechanism":"SCRAM-SHA-256",
    "sasl_plain_username":"demo",
    "sasl_plain_password":"Qq2EnlzHpzv3RZDAMjZzfZCwrFZyhK"
    }
op.output("out1", serialized, brokers=BROKERS, topic=OUT_TOPIC, )

A list of entries obtained via this script can be found here. This is a sample data output:

{"T": "n", "id": 39063802, "headline": "Piper Sandler Maintains Overweight on Virtus Inv, Raises Price Target to $267", "summary": "Piper Sandler  analyst Crispin Love   maintains Virtus Inv (NYSE:VRTS) with a Overweight and raises the price target from $265 to $267.", "author": "Benzinga Newsdesk", "created_at": "2024-05-29T13:26:51Z", "updated_at": "2024-05-29T13:26:52Z", "url": "https://www.benzinga.com/news/24/05/39063802/piper-sandler-maintains-overweight-on-virtus-inv-raises-price-target-to-267", "content": "Piper Sandler  analyst Crispin Love   maintains Virtus Inv (NYSE:<a class=\"ticker\" href=\"https://www.benzinga.com/stock/VRTS#NYSE\">VRTS</a>) with a Overweight and raises the price target from $265 to $267.", "symbols": ["VRTS"], "source": "benzinga"}
{"T": "n", "id": 39063805, "headline": "Faraday Future To Share Details Of &#34;US-China Automotive Industry Bridge Strategy&#34; Including Phase 1 In Coming Months; Receives Nasdaq Grant For Extended Suspension Pending Hearing", "summary": "", "author": "Benzinga Newsdesk", "created_at": "2024-05-29T13:27:16Z", "updated_at": "2024-05-29T13:27:17Z", "url": "https://www.benzinga.com/news/24/05/39063805/faraday-future-to-share-details-of-us-china-automotive-industry-bridge-strategy-including-phase-1-in", "content": "", "symbols": ["FFIE"], "source": "benzinga"}

The data contains the following fields:

  • T - ticker symbol
  • id - ID uniquely identifying an article
  • headline - article headline
  • summary - brief description of the article
  • author - who authored the article
  • created_at - when was the article created
  • updated_at - when was the article updated
  • url - link to complete article
  • content - content of article
  • symbols - symbols associated with article
  • source - where the article was extracted from

We can execute the dataflow locally via

python -m bytewax.run news_ingestion:flow

To deploy the dataflow in a Kubernetes cluster, we can install and execute as follows:

brew tap bytewax/tap
brew install waxctl

waxctl dataflow deploy news_ingestion.py --name ingest-news-topics

For more information on configuring and setting up waxctl to deploy your dataflows, you can review the waxctl documentation.

In this section, we explored how to set up a real-time dataflow that connects to websockets and populates a Kafka instance. Let's now explore how we can set up a dataflow to detect updates.

Detecting when an article has been updated

The next phase is focused on detecting those entries that have been updated. A key concept we will leverage is the use of windowing techniques. Let's get right to it.

You can find a complete script and dataflow to detect when an entry has been updated in this GitHub repository.

The dataflow is initialized using the same method explored before. We set up the JSONL we generated as input - note you can swap the FileSource input for KafkaSource to read from your Kafka instance. A function to deserialize the data is applied through the map operator, and some data wrangling (time field format conversion) is applied through the use of the filter_map and map Bytewax operators.

We also transform the format into a tuple - we will track each entry through the unique ID, and add key fields to track what has changed.

flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize", 
                                 input_data, 
                                 safe_deserialize)
transform_data_time = op.map("timeconversion", 
                             deserialize_data, 
                             parse_time)
map_tuple = op.map(
    "tuple_map",
    transform_data_time,
    lambda reading_data: (str(reading_data["id"]), {"created_at":reading_data['created_at'],
                                                    "updated_at":reading_data['updated_at'],
                                                    "headline": reading_data['headline'],
                                                     "content": reading_data['content']}
                                                    ),
)

Let's explore how windowing enables us to detect what entries have changed. We will set up the following Clock configuration and Window. To learn more about Clocks and Windowing, please review our documentation.

In the code below, we will set up our clock configuration such that it reads the time at which the article was updated. We are also going to use a TumblingWindow initialized on the day that the ingestion pipeline was executed - note that you can choose to use SystemClock to use the current time of the system instead.

Given the frequency of the news articles, we will set up our window such that the length is 30 seconds - this means we are looking for articles that were updated within a 30 second window.

event_time_config: EventClock = EventClock(
   ts_getter=lambda e: e['updated_at'], 
    wait_for_system_duration=timedelta(seconds=1)
)
align_to = datetime(2024, 5, 29, 1,  
                    tzinfo=timezone.utc)
clock_config = TumblingWindower(align_to=align_to,
                                length=timedelta(seconds=30))

# Collect the windowed data
window = wop.collect_window(
    "windowed_data", 
    map_tuple, 
    clock=event_time_config, 
    windower=clock_config
)

We are now ready to identify the updated entries. The function below will unpack our tuples containing the id and each line corresponding to the JSONL file. The window_id will identify those entries that fall within the window, with the event containing the entries affected.

We can collect all entries in a list, and only return those whose length exceeds 1.

Finally, we can use the filter_map operator to show us only those entries that were updated within the window we specified.

def find_duplicate_ids_in_window(window):
    """Identify duplicate news ID entries given a specific window"""
    
    # Unpack content of tuple
    id, search_session = window
    window_id, events = search_session

    # Collect duplicate entries
    searches = [event for event in events ]

    
    if len(searches)>1:
        return id, searches
    else:
        return None

calc = op.filter_map("find_updates", window.down, find_duplicate_ids_in_window)

op.output("output", calc, StdOutSink())

In the example shared, we can direct the output such that it is displayed on the terminal at the time of execution. We can execute our dataflow locally via the command

python -m bytewax.run window_dataflow:flow

Here is the output for entries changed within 30 seconds of being created:

('39073809', [{'created_at': datetime.datetime(2024, 5, 29, 20, 25, 1, tzinfo=datetime.timezone.utc), 'updated_at': datetime.datetime(2024, 5, 29, 20, 25, 2, tzinfo=datetime.timezone.utc), 'headline': 'Agilent Technologies Revises Q3 EPS To $1.25-$1.28 Vs $1.45 Est.; Revenue $1.535B-$1.575B Vs $1.72B Est.; FY24 EPS $5.15-$5.25 Vs $5.50 Est.; Revenue $6.42B-$6.5B Vs $6.77B Est.', 'content': '<p><strong>Full Year 2024 and Third-Quarter Outlook</strong></p><p>Full-year revenue outlook is revised at $6.420 billion to $6.500 billion, representing a range of down 6.0% to 4.9% on a reported basis and down 5.4% to 4.3% core(1). Fiscal year 2024 non-GAAP(3) earnings guidance is revised at a range of $5.15 to $5.25 per share.</p><p>The outlook for third-quarter revenue is expected in the range of $1.535 billion to $1.575 billion, a decline of 8.2% to 5.8% reported and 6.9% to 4.5% core(1). Third-quarter non-GAAP(3) earnings guidance is expected in the range of $1.25 to $1.28 per share.</p><p>The outlook is based on forecasted currency exchange rates.</p>'},
              {'created_at': datetime.datetime(2024, 5, 29, 20, 25, 1, tzinfo=datetime.timezone.utc), 'updated_at': datetime.datetime(2024, 5, 29, 20, 25, 16, tzinfo=datetime.timezone.utc), 'headline': 'Agilent Technologies Revises Q3 EPS To $1.25-$1.28 Vs $1.45 Est.; Revenue $1.535B-$1.575B Vs $1.72B Est.; FY24 EPS $5.15-$5.25 Vs $5.50 Est.; Revenue $6.42B-$6.5B Vs $6.77B Est.', 'content': '<p><strong>Full Year 2024 and Third-Quarter Outlook</strong></p>\r\n\r\n<p>Full-year revenue outlook is revised at $6.420 billion to $6.500 billion, representing a range of down 6.0% to 4.9% on a reported basis and down 5.4% to 4.3% core(1). Fiscal year 2024 non-GAAP(3) earnings guidance is revised at a range of $5.15 to $5.25 per share.</p>\r\n\r\n<p>The outlook for third-quarter revenue is expected in the range of $1.535 billion to $1.575 billion, a decline of 8.2% to 5.8% reported and 6.9% to 4.5% core(1). Third-quarter non-GAAP(3) earnings guidance is expected in the range of $1.25 to $1.28 per share.</p>\r\n\r\n<p>The outlook is based on forecasted currency exchange rates.</p>\r\n'}])

We can verify that the update took place within 30 seconds of being created, and can verify the changes in content.

Or, if you'd like to run the dataflow through waxctl

waxctl dataflow deploy window_dataflow.py --name detect-updates

Summary

In this article we explored how we can leverage Bytewax to set up scalable real-time dataflows to extract news articles, and furthermore identify those that have been updated. Given the volume of articles that came through the stream, we leveraged windowing techniques to chunk the data such that we can define when to look for updates. This technique enables us to efficiently identify changes and determine the best course of action (ignoring stale entries, using the last update, or raising an alert if an update has been found).

We also introduced waxctl as a tool to easily manage and deploy several dataflows as exemplified by two Bytewax dataflow: an ingestion dataflow to generate JSONL entries fed into a Kafka instance, and an update detection dataflow that identifies what entries have been updated. These dataflows can ease the operationalization of data processes involving rapidly changing information.

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post

Laura Funderburk

Senior Developer Advocate
Laura Funderburk holds a B.Sc. in Mathematics from Simon Fraser University and has extensive work experience as a data scientist. She is passionate about leveraging open source for MLOps and DataOps and is dedicated to outreach and education.
Next post