Hacker News From Request to Stream: A Deep Dive into How to Use Bytewax to Poll HTTP Endpoints to Create a Real-Time Stream of Data
Introduction
In the dynamic landscape of today’s business environment, the ability to access and respond to the most recent data is not just an advantage, it’s a necessity. Companies across the globe are vying for strategies and technologies that enable them to process and analyze data in real time, ensuring operations happen in a timely and effective manner. This demand propels us into the exploration of mechanisms like polling HTTP endpoints – a technique pivotal in the contemporary data ecosystem for retrieving the latest data directly from the source.
Polling HTTP endpoints refers to the automated process of sending requests to a specific HTTP URL at regular intervals. This procedure ensures that businesses can capture, process, and respond to the most recent and relevant data. In industries where data is continuously generated and updated—such as e-commerce, finance, and social media—this real-time data retrieval is integral for instant analytics, immediate decision-making, and timely response to market trends and customer behaviors.
In this deep dive, we’ll show how you can use the periodic input available in bytewax since v0.17.1 mechanism in Bytewax to poll HTTP endpoints, unveiling the step-by-step process to poll, retrieve, and stream data in real-time.
Diving into Real-Time Data Polling with Bytewax
In this piece, we're focusing on how to effectively retrieve real-time data using Bytewax, demonstrated with a Python script for pulling data from the Hacker News API. We are going to walk through a datflow program that will continuously poll the Hacker News API, retrieve a set of ids, distribute those ids to different workers and then retrieve the metadata of the items and the root id of the item. You can see the entire dataflow program below, we will dissect the code below to understand its structure and functionality.
import requests
from datetime import datetime, timedelta
import time
from typing import Any, Optional
from bytewax.connectors.periodic import SimplePollingInput
from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HNInput(SimplePollingInput):
def __init__(self, interval: timedelta, align_to: Optional[datetime] = None, init_item: Optional[int] = None):
super().__init__(interval, align_to)
logger.info(f"received starting id: {init_item}")
self.max_id = init_item
def next_item(self):
'''
Get all the items from hacker news API between
the last max id and the current max id
'''
if not self.max_id:
self.max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
new_max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
logger.info(f"current id: {self.max_id}, new id: {new_max_id}")
ids = [int(i) for i in range(self.max_id, new_max_id)]
self.max_id = new_max_id
return ids
def download_metadata(hn_id):
# Given an hacker news id returned from the api, fetch metadata
req = requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
)
if not req.json():
logger.warning(f"error getting payload from item {hn_id} trying again")
time.sleep(0.5)
return download_metadata(hn_id)
return req.json()
def recurse_tree(metadata):
try:
parent_id = metadata["parent"]
parent_metadata = download_metadata(parent_id)
return recurse_tree(parent_metadata)
except KeyError:
return (metadata["id"], {**metadata, "key_id": metadata["id"]})
def key_on_parent(metadata: dict) -> tuple:
key, metadata = recurse_tree(metadata)
return (key, metadata)
def run_hn_flow(init_item):
flow = Dataflow()
flow.input("in", HNInput(timedelta(seconds=15), None, init_item)) # skip the align_to argument
flow.flat_map(lambda x: x)
# If you run this dataflow with multiple workers, downloads in
# the next `map` will be parallelized thanks to .redistribute()
flow.redistribute()
flow.map(download_metadata)
flow.inspect(logger.info)
# We want to keep related data together so let's build a
# traversal function to get the ultimate parent
flow.map(key_on_parent)
flow.output("std-out", StdOutput())
return flow
Breaking it Down
Import Necessary Libraries and Modules
import requests
from datetime import datetime, timedelta
from typing import Any, Optional
from bytewax.connectors.periodic import SimplePollingInput
from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
We start by importing standard libraries and modules. The requests
library is used for making HTTP requests to the Hacker News API. We’re also setting up logging to capture informational messages during the script’s execution, aiding in debugging and verification of the data retrieval process.
Create a Custom Input Class
class HNInput(SimplePollingInput):
def __init__(self, interval: timedelta, align_to: Optional[datetime] = None, init_item: Optional[int] = None):
super().__init__(interval, align_to)
logger.info(f"received starting id: {init_item}")
self.max_id = init_item
Here, a custom input class HNInput
is defined, inheriting from Bytewax’s SimplePollingInput
. The __init__
method is extended to log the initial item ID and set it as the max_id
. This will be used as the starting point for retrieving new items from the API. If it is ignored by passing None, the dataflow will just start processing from the latest item ID.
Implement the Data Retrieval Method
def next_item(self):
'''
Get all the items from hacker news API between
the last max id and the current max id
'''
if not self.max_id:
self.max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
new_max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
logger.info(f"current id: {self.max_id}, new id: {new_max_id}")
ids = [int(i) for i in range(self.max_id, new_max_id)]
self.max_id = new_max_id
return ids
The next_item
method is the core of the data retrieval process. It makes HTTP requests to the Hacker News API to get the current maximum item ID. It then calculates the range of new item IDs that have been added since the last retrieval and updates the max_id
to the current maximum. This set of new item IDs is returned for further processing.
This script efficiently retrieves new item IDs from the Hacker News API at regular intervals, facilitated by the Bytewax library. It’s a practical example for Python developers looking to implement real-time data retrieval in their applications. The logged messages assist in monitoring the script’s progress and verifying its functionality, ensuring that it’s always clear which items are being retrieved and processed.
Extending the Data Retrieval Pipeline with Metadata Enrichment and Seeking the Root Item ID
With the real-time data retrieval mechanism in place, the next step is to extend this capability to include metadata extraction and to get the parent of the items. The items can be comments, polls, poll opts or storeis. We want to recurse to the ultimate parent of the comment and add that as a key. This will allow us to efficiently compute statistics about a story in real-time like how many comments it has had in the past 15 minutes (that is left to an exercise for the reader, but a good starting point can be found here). In this section, we’ll walk through the rest of the Python script that completes this process.
Extracting Metadata with Recursion
def download_metadata(hn_id):
# Given an hacker news id returned from the api, fetch metadata
req = requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
)
if not req.json():
logger.warning(f"error getting payload from item {hn_id} trying again")
time.sleep(0.5)
return download_metadata(hn_id)
return req.json()
The download_metadata
function is tasked with fetching detailed metadata for each Hacker News ID. It sends a request to the Hacker News API and retrieves the corresponding item’s JSON payload. If the request fails to retrieve data, the function retries after a short pause.
Traversing the Comment Thread
def recurse_tree(metadata):
try:
parent_id = metadata["parent"]
parent_metadata = download_metadata(parent_id)
return recurse_tree(parent_metadata)
except KeyError:
return (metadata["id"], {**metadata, "key_id": metadata["id"]})
def key_on_parent(metadata: dict) -> tuple:
key, metadata = recurse_tree(metadata)
return (key, metadata)
To fetch the entire comment thread related to a specific item, recurse_tree
function recursively retrieves the parent comments until it reaches the root. This function helps in collecting the entire context of a conversation or comment thread.
Building and Running the Dataflow
def run_hn_flow(init_item):
flow = Dataflow()
flow.input("in", HNInput(timedelta(seconds=15), None, init_item))
flow.flat_map(lambda x: x)
flow.redistribute()
flow.map(download_metadata)
flow.inspect(logger.info)
flow.map(key_on_parent)
flow.output("std-out", StdOutput())
return flow
The run_hn_flow
function integrates all the previously defined functions and methods, creating a complete dataflow. The data is retrieved, redistributed among multiple workers for parallel processing (if available), enriched with metadata and keyed by the attributed parent item. Finally, the data is sent to the standard output, ready to be consumed, analyzed, or stored.
Running our Dataflow
We’ve implemented a parameterized dataflow by wrapping our flow up in a function. This allows us to efficiently update the starting item id so that we can backfill from a certain date. To run a parametrized dataflow, it is similar to the way we usually run a python module with the python -m pkg:module
command familiar to Flask and Fast API users, but we wrapp the command in a string. You can see the full command below.
python -m bytewax.run "dataflow:run_hn_flow(item_id)"
Where item_id is the starting id number for the dataflow program.
Scaling Up
If you are backfilling and there are a lot of requests being made to get the parent id and the metadata, you may want to parallelize your dataflow. To do this, you can add the number of processes you would like to run with the -p argument.
python -m bytewax.run "dataflow:run_hn_flow(37771380)" -p3
Note that for this particular type of workflow where you cannot easily parallelize the ids, you will be limited to a single worker for the initial input and only once you redistribute
will the workers share the load.
Wrapping Everything Up
This dataflow program represents a comprehensive approach to real-time data retrieval and processing. Starting with polling the Hacker News API for new items, it then distributes, enriches, and outputs the data efficiently. By leveraging Bytewax’s straightforward interface and Python’s expressive syntax, developers can adapt and extend this script to fit specific needs, integrating real-time data into applications and analytics workflows seamlessly.
Like what you see! Give us a ⭐ on the Bytewax GitHub repo.
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.