top-kat and Streaming Algorithms in Bytewax

At the beginning of your career, learning data streaming concepts and algorithms so that you can start building your real-time applications can feel daunting. Today, we’re sharing a tech dive from our intern, Isaac Milstein — to show how people like you (yes, you!) can embark on a streaming journey and jump into using data streams with Bytewax while having fun!

Since my grandfather introduced me to programming years ago, I’ve been fascinated with the logistics of how people get computers to do what they do. During remote learning, I really got into coding, picking up basic knowledge of a handful of languages to do random things I happened to be interested in at the time, like web development or video game modding. Now, as an intern at Bytewax, I’ve been learning and training on some of the tooling used at the company. My main exercise has been writing Python bindings to a Rust crate containing a handful of useful probabilistic data structures using PyO3. These bindings ended up being (debatably) coherent enough for release, and now we have top-kat.

What are probabilistic data structures?

Quite a few problems in large-scale computing demand heavy memory usage, such as counting distinct items in a set. In some cases, it may be impractical or too expensive to dedicate enough memory and power to solving a problem the traditional way, as scaling horizontally can add lots of complexity and is typically necessary as the problem grows. Probabilistic data structures help alleviate this issue by providing ways to solve these problems while using sub-linear space and minimal computing power. The main drawback of these approaches is that they come with a penalty to accuracy.

One well-known probabilistic data structure is the HyperLogLog algorithm, used to perform the example given before of counting distinct elements. It exploits a mathematical property of evenly distributed binary numbers relating the count of zeros at the start of a number and how many random elements it would take (on average) to find that many zeros in a set, giving an approximate answer while using O(log(log(n)) space and O(1) time.

What is top-kat?

top-kat is a Python package I wrote with some streaming helper classes and probabilistic data structures (implemented in their streaming forms). The following structures are currently available:

  • Count-min sketch
  • HyperLogLog
  • Simple random sample
  • Unstable reservoir sample
  • Top-K
  • T-Digest

They operate as objects with either push or pull functions, adding or removing one element at a time. top-kat is written as bindings to the streaming_algorithms Rust crate, meaning it gets the substantial performance boost of Rust with the simplicity of Python.

Using top-kat with Bytewax

For this example, we’ll be creating a simple dataflow that uses the Top-K algorithm to count the most used words on Mastodon. To start, create a basic single-partition Bytewax input that streams live posts from the original Mastodon server (or a different one of your choice). You’ll need an app access token created from an account on the server, which Mastodon has made quite easy. Just log in, then go to preferences -> development -> new application -> your access token. We’ll be using urlllib3 and sseclient to get server-side events from their streaming endpoint, as it’s more convenient than using the websocket API.

class MastodonSource(StatefulSource):
  def __init__(self):
    pool = urllib3.PoolManager()
    response = pool.request(
      "GET",
      "https://streaming.mastodon.social/api/v1/streaming/public",
      preload_content=False,
      headers = {
         "Accept": "text/event-stream",
         "Authorization": f"Bearer YOUR_TOKEN_HERE" # You can also move the token elsewhere and read it in for this step
      }
    )
    self.client = sseclient.SSEClient(response)
    self.events = self.client.events()

  def next(self):
    return next(self.events)
    
  def snapshot(self):
    return None
  def close(self):
    self.client.close()

class MastodonInput(PartitionedInput):
  def list_parts(self):
    return {"single-part"}
    
  def build_part(self, for_key, resume_state):
    assert for_key == "single-part"
    assert resume_state is None
    return MastodonSource()

Next, we’ll build the beginnings of the dataflow. Start by filtering only the new posts in English, then reducing the data to only the content of those posts.

flow = Dataflow()
flow.input("mastodon", MastodonInput())
flow.filter(lambda x: x.event == "update") # Allow only new posts
flow.map(lambda x: json.loads(x.data))
flow.filter(lambda x: x['language'] == "en") # Filter non-english posts (language detection on Mastodon is pretty bad though)
flow.map(lambda x: x['content']) # Get just the content

Mastodon posts are streamed as HTML, so it’s necessary to do a bit of refining to get the result we want. We’ll be using BeautifulSoup here to get just the text, excluding links.

def parse_map(x): # Returns a list of words from a body of HTML
  soup = BeautifulSoup(x, features="html.parser")
  for a in soup("a"): # Remove all links
    a.decompose()
  content = soup.get_text().replace("\n", " ").lower()
  words = re.findall(r'[a-zA-Z\'`‘’‛]+', content) # Be careful with the various apostrophes in here, some are non-standard characters
  return(words)

flow.flat_map(parse_map) # Don't forget to add this to your dataflow!

And now where the magic happens. top-kat makes using its algorithms simple, so only a few lines of code are necessary to add Top-K to our dataflow.

topk = TopK(25, .99, .002) # Top 25 words, 99% error tolerance with a .2% error probability

def push_map(x): # Adds word to the Top-K structure and emits the top counted words downstream
  topk.push(x, 1)
  return topk.top()

flow.map(push_map) # Don't forget this one either!

Now you can add an output of your choice to the dataflow, and voilá! A list comprised mostly of grammatical words that anybody could have guessed would be used most commonly. The full program can be found below:

import urllib3
import sseclient
import json
from bs4 import BeautifulSoup
import re
from top_kat import TopK

from bytewax.inputs import PartitionedInput, StatefulSource
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput


class MastodonSource(StatefulSource):
  def __init__(self):
    pool = urllib3.PoolManager()
    response = pool.request(
      "GET",
      "https://streaming.mastodon.social/api/v1/streaming/public",
      preload_content=False,
      headers={
        "Accept": "text/event-stream",
        "Authorization": f"Bearer YOUR_TOKEN_HERE"  # You can also move the token elsewhere and read it in for this step
      }
    )
    self.client = sseclient.SSEClient(response)
    self.events = self.client.events()

  def next(self):
    return next(self.events)

  def snapshot(self):
    return None

  def close(self):
    self.client.close()


class MastodonInput(PartitionedInput):
  def list_parts(self):
    return {"single-part"}

  def build_part(self, for_key, resume_state):
    assert for_key == "single-part"
    assert resume_state is None
    return MastodonSource()


def parse_map(x):  # Returns a list of words from a body of HTML
  soup = BeautifulSoup(x, features="html.parser")
  for a in soup("a"):  # Remove all links
    a.decompose()
  content = soup.get_text().replace("\n", " ").lower()
  # Be careful with the various apostrophes in here, some are non-standard characters
  words = re.findall(r'[a-zA-Z\'`‘’‛]+', content)
  return words


topk = TopK(25, .99, .002)  # Top 25 words, 99% error tolerance with a .2% error probability


def push_map(x):  # Adds word to the Top-K structure and emits the top counted words downstream
  topk.push(x, 1)
  return topk.top()


flow = Dataflow()
flow.input("mastodon", MastodonInput())
flow.filter(lambda x: x.event == "update")  # Allow only new posts
flow.map(lambda x: json.loads(x.data))
flow.filter(lambda x: x['language'] == "en")  # Filter non-english posts (language detection on Mastodon is pretty bad though)
flow.map(lambda x: x['content'])  # Get just the content
flow.flat_map(parse_map)  # Transform to a list of words
flow.map(push_map)  # Count words
flow.output("output", StdOutput())