Real-Time Financial Exchange Order Book

In this example we are going to walk through how you can maintain a limit order book in real-time with very little extra infrastructure with Bytewax.

We are going to:

  • Use websockets to connect to an exchange (coinbase)
  • Setup an order book using a snapshot
  • Update the order book in real-time
  • Use algorithms to trade crypto currencies and profit. Just kidding, this is left to an exercise for the reader.
Skill level: Intermediate Time to complete: 15-30 min

Prerequisities

Python modules websocket-client

Your takeaway

At the end of this tutorial users will understand how to use Bytewax to analyze financial exchange data by connecting to a websocket and using Bytewax operators to maintain an order book and analyzing the orders.

Concepts

To start off, we are going to diverge into some concepts around markets, exchanges and orders.

Order Book


A Limit Order Book, or just Order Book is a record of all limit orders that are made. A limit order is an order to buy (bid) or sell (ask) an asset for a given price. This could facilitate the exchange of dollars for shares or, as in our case, they could be orders to exchange crypto currencies. On exchanges, the limit order book is constantly changing as orders are placed every fraction of a second. The order book can give a trader insight into the market, whether they are looking to determine liquidity, to create liquidity, design a trading algorithm or maybe determine when bad actors are trying to manipulate the market.

Bid and Ask


In the order book, the ask price is the lowest price that a seller is willing to sell at and the bid price is the highest price that a buyer is willing to buy at. A limit order is different than a market order in that the limit order can be placed with generally 4 dimensions, the direction (buy/sell), the size, the price and the duration (time to expire). A market order, in comparison, has 2 dimensions, the direction and the size. It is up to the exchange to fill a market order and it is filled via what is available in the order book.

Level 2 Data


An exchange will generally offer a few different tiers of information that traders can subscribe to. These can be quite expensive for some exchanges, but luckily for us, most crypto exchanges provide access to the various levels for free! For maintaining our order book we are going to need at least level2 order information. This gives us granularity of each limit order so that we can adjust the book in real-time. Without the ability to maintain our own order book, the snapshot would get almost instantly out of date and we would not be able to act with perfect information. We would have to download the order book every millisecond, or faster, to stay up to date and since the order book can be quite large, this isn't really feasible.

Alright, let's get started!

Websocket Input

We are going to eventually create a cluster of dataflows where we could have multiple currency pairs running in parallel on different workers. To start, we will build a websocket input function that will be able to handle multiple workers. The input will use the coinbase pro websocket url (wss://ws-feed.pro.coinbase.com) and the Python websocket library to create a connection. Once connected we can send a message to the websocket subscribing to product_ids (pairs of currencies - USD-BTC for this example) and channels (level2 order book data). Finally since we know there will be some sort of acknowledgement message we can grab that with ws.recv() and print it out.

For testing purposes, if you don't want to perform HTTP requests, you can read the sample websocket data from a local file with a list of JSON responses. You can use the data in the GitHub repo to do so with the code shown below.

def ws_input(product_ids, state):
    print(
        '{"type":"subscriptions","channels":[{"name":"level2","product_ids":["BTC-USD","ETH-USD"]}]}'
    )
    for msg in open("crypto-sockets.json"):
        yield state, msg

Now that we have our websocket based data generator built, we will write an input builder for our dataflow. Since we're using a manual input builder, we'll pass a ManualInputConfig as our input with the builder as a parameter. The input builder is called on each worker and the function will have information about the worker_index and the total number of workers (worker_count). In this case we are designing our input builder to handle multiple workers and multiple currency pairs, so that we can parallelize the input. We have added some custom code in order to distribute the currency pairs with the logic. It should be noted that if you run more than one worker with only one currency pair, the other workers will not be used.

Constructing The Dataflow

Before we get to the exciting part of our order book dataflow we need to create our dataflow object and prep the data. We start with creating the dataflow object, specifying the input we built above and then we will add two map operators. The first will deserialize the JSON we are receiving from the websocket into a dictionary. Once deserialized, we can reformat the data to be a tuple of the shape (product_id, data). This will permit us to aggregate by the product_id as our key in the next step.

Now for the exciting part.

  1. Construct the orderbook as two dictionaries, one for asks and the other for bids.
  2. Assign a value to the ask and bid price.
  3. For each new order, update the order book and then update the bid and ask prices where required.
  4. Return bid and ask price, the respective volumes of the ask and the difference between the prices.

The data from the coinbase pro websocket is first a snapshot of the current order book in the format:

{
  "type": "snapshot",
  "product_id": "BTC-USD",
  "bids": [["10101.10", "0.45054140"]...],
  "asks": [["10102.55", "0.57753524"]...]
}

and then each additional message is an update with a new limit order of the format:

{
  "type": "l2update",
  "product_id": "BTC-USD",
  "time": "2019-08-14T20:42:27.265Z",
  "changes": [
    [
      "buy",
      "10101.80000000",
      "0.162567"
    ]
  ]
}

This flow of receiving a snapshot followed by real-time updates works well for our scenario with respect to recovery. In the instance that we lose connection, we will not need to worry about recovery and we can resume from the initial snapshot.

To maintain an order book in real time, we will first need to construct an object to hold the orders from the snapshot and then update that object with each additional update. This is a good use case for the stateful_map operator, which can aggregate data by a key. Stateful_map will aggregate data based on a function (mapper), into an object that you define. The object must be defined via a builder, because it will create a new object via this builder for each new key received. The mapper must return the object so that it can be updated.

Below we have the code for the OrderBook object that has a bids and asks dictionary. These will be used to first create the order book from the snapshot and once created we can attain the first bid price and ask price. The bid price is the highest buy order placed and the ask price is the lowest sell order places. Once we have determined the bid and ask prices, we will be able to calculate the spread and track that as well.

With our snapshot processed, for each new message we receive from the websocket, we can update the order book, the bid and ask price and the spread via the update method. Sometimes an order was filled or it was cancelled and in this case what we receive from the update is something similar to 'changes': [['buy', '36905.39', '0.00000000']]. When we receive these updates of size '0.00000000', we can remove that item from our book and potentially update our bid and ask price. The code below will check if the order should be removed and if not it will update the order. If the order was removed, it will check to make sure the bid and ask prices are modified if required.

Finishing it up, for fun we can filter for a spread as a percentage of the ask price greater than 0.1% and then capture the output. Maybe we can profit off of this spread... or maybe not.

With our snapshot processed, for each new message we receive from the websocket, we can update the order book, the bid and ask price and the spread via the update method. Sometimes an order was filled or it was cancelled and in this case what we receive from the update is something similar to 'changes': [['buy', '36905.39', '0.00000000']]. When we receive these updates of size '0.00000000', we can remove that item from our book and potentially update our bid and ask price. The code below will check if the order should be removed and if not it will update the order. If the order was removed, it will check to make sure the bid and ask prices are modified if required.

Finishing it up, for fun we can filter for a spread as a percentage of the ask price greater than 0.1% and then capture the output. Maybe we can profit off of this spread... or maybe not.

Executing the Dataflow

Bytewax provides a few different entry points for executing a dataflow. In this example, we are using the spawn_cluster method. What this will do is create the number of worker threads and processes as configured in the command. This will create a separate websocket connection for each worker in this scenario because of the nature of websockets and the ability to parallelize them.

If you want to run the dataflow, you can do so locally or you can use the dockerfile to run in a containerized environment.

Now we can build the Dockerfile and run the dataflow in a docker container with multiple processes.

Eventually, if the spread is greater than 0.1%, we will see some output similar to what is below.

{"type":"subscriptions","channels":[{"name":"level2","product_ids":["BTC-USD"]}]}
('BTC-USD', (38590.1, 0.00945844, 38596.73, 0.01347429, 6.630000000004657))
('BTC-USD', (38590.1, 0.00945844, 38597.13, 0.02591147, 7.029999999998836))
('BTC-USD', (38590.1, 0.00945844, 38597.13, 0.02591147, 7.029999999998836))