Difference between revisions of "Programming/Kdb/Labs/Feedhandler and kdb tick environment"

From Thalesians Wiki
< Programming‎ | Kdb‎ | Labs
Line 727: Line 727:
[[File:bitmex_rdb.png]]
[[File:bitmex_rdb.png]]
</center>
</center>
=Visualization tool=
Finally, we prototype a (very basic) visualization tool for our data on the basis of the Python tickerplant subscriber that we have introduced above. Here is the code, which can be improved in many ways:
<pre>
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np
import random
import sys
import threading
import time
from qpython import qconnection
from qpython.qtype import QException
from qpython.qconnection import MessageType
from qpython.qcollection import QTable
class ListenerThread(threading.Thread):
    def __init__(self, q, data_class):
        super(ListenerThread, self).__init__()
        self.q = q
        self._data_class = data_class
        self._stopper = threading.Event()
    def stopit(self):
        self._stopper.set()
    def stopped(self):
        return self._stopper.is_set()
    def run(self):
        while not self.stopped():
            print('.')
            try:
                message = self.q.receive(data_only = False, raw = False) # retrieve entire message
                if message.type != MessageType.ASYNC:
                    print('Unexpected message, expected message of type: ASYNC')
                print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed))
                if isinstance(message.data, list):
                    # unpack upd message
                    if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable):
                        for row in message.data[2]:
                            print(row)
                            print(row[2], row[6])
                            self._data_class.x_data.append(row[2])
                            self._data_class.y_data.append(row[6])
            except QException as e:
                print(e)
class DataClass():
    def __init__(self):
        self.x_data = []
        self.y_data = []
class PlotClass():
    def __init__(self, data_class):
        self._data_class = data_class
        self.line, = plt.plot([], [], 'o')
        self.ani = FuncAnimation(plt.gcf(), self.run, interval=1000, repeat=True)
    def run(self, i): 
        print("plotting data")
        self.line.set_data(self._data_class.x_data, self._data_class.y_data)
        self.line.axes.relim()
        self.line.axes.autoscale_view()
with qconnection.QConnection(host = 'localhost', port = 5010) as q:
    print(q)
    print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
    print('Press <ENTER> to close application')
    # subscribe to tick
    print(dir(q))
    response = q.sendSync('.u.sub', np.string_('trade'), np.string_(''))
    # get table model
    if isinstance(response[1], QTable):
        print('%s table data model: %s' % (response[0], response[1].dtype))
    data = DataClass()
    plotter = PlotClass(data)
    t = ListenerThread(q, data)
    t.start()
    plt.show()
</pre>
And here is the result of running it for a few minutes:

Revision as of 01:33, 20 June 2021

Cryptocurrency

A cryptocurrency, crypto-currency, or crypto is a digital asset designed to work as a medium of exchange wherein individual coin ownership records are stored in a ledger existing in a form of a computerized database using strong cryptography to secure transaction records, to control the creation of additional coins, and to verify the transfer of coin ownership. It typically does not exist in physical form (like paper money) and is typically not issued by a central authority. Cryptocurrencies typically use decentralized control as opposed to centralized digital currency and central banking systems. When a cryptocurrency is minted or created prior to issuance or issued by a single issuer, it is generally considered centralized. When implemented with decentralized control, each cryptocurrency works through distributed ledger technology, typically a blockchain, that serves as a public financial transaction database.

Bitcoin, first released as open-source software in 2009, is the first decentralized cryptocurrency. Since the release of bitcoin, other cryptocurrencies have been created.

Cryptocurrency exchanges

A cryptocurrency exchange, or a digital currency exchange (DCE), is a business that allows customers to trade cryptocurrencies or digital currencies for other assets, such as conventional fiat money or other digital currencies. Exchanges may accept credit card payements, wire transfers or other forms of payment in exchange for digital currencies or cryptocurrencies. A cryptocurrency exchange can be a market maker that typically takes the bid-ask spreads as a transaction commission for its service or, as a matching platform, simply charging fees.

Here is a list of some major cryptocurrency exchanges:

APIs

API is an acronym for Application Programming Interface. In general, an API is an interface that allows two unrelated systems to interact with each other. An API specification describes the way that each of the two systems must interact with the API itself (such as the language that must be used, the syntax of messages that are passed back and forth, how frequently the messages can be sent, etc.).

Specifically for cryptocurrency trading, an API enables the client to interact with the exchange programmatically (via software instead of a human interface), allowing the client to obtain real-time market data, make trades, and manage the user's account.

A few of the tasks that can be accomplished via an API:

  • download historical market data;
  • stream real-time market data;
  • place and cancel trading orders;
  • view account balances;
  • download trading history;
  • make funding transactions...

Cryptocurrency exchange APIs are typically implemented using REST or the WebSocket protocol.

REST

Representational state transfer (REST) is a software architectural style that was created to guide the design and development of the architecture for the World Wide Web. REST defines a set of constraints for how the architecture of an Internet-scale distributed hypermedia system, such as the Web, should behave. The REST architectural style emphasizes the scalability of interactions between components, uniform interfaces, independent deployment of components, and the creation of a layered architecture to facilitate caching components to reduce user-perceived latency, enforce security, and encapsulate legacy systems.

A web service that obeys the REST constraints is informally described as RESTful. Such a web service must provide its web resources in a textual representation and allow them to be read and modified with a stateless protocol and a predetermined set of operations. This approach allows the greatest interoperability between clients and servers in a long-lived Internet-scale environment which crosses organisational boundaries.

In a RESTful web service, requests made to a resource's URI elicit a response with a payload formatted in HTML, XML, JSON, or some other format. For example, the response can confirm that the resource state has been changed. The response can include hypertext links to related resources. The most common protocol for these requests and responses is HTTP. It provides operations (HTTP methods) such as GET, POST, PUT, and DELETE. By using a stateless protocol and standard operations, RESTful systems aim for fast performance, reliability, and the ability to grow by reusing components that can be managed and updated without affecting the system as a while, even while it is running.

WebSocket

WebSocket is a computer communications protocol, providing full-duplex communication channels over a single TCP connection. The WebSocket protocol was standardized by the IETF as RFC 6455 in 2011, and the WebSocket API in Web IDL is being standardized by the W3C.

WebSocket is distinct from HTTP. Both protocols are located at layer 7 in the OSI model and depend on TCP at layer 4. Although they are different, RFC 6455 states that WebSocket

is designed to work over HTTP ports 443 and 80 as well as to support HTTP proxies and intermediaries,

thus making it compatible with HTTP. To achieve compatibility, the WebSocket handshake uses the HTTP Upgrade header to change from the HTTP protocol to the WebSocket protocol.

Schematically the WebSocket connection can be represented as follows:

Websocket connection.png

REST versus WebSocket

Priya Pademkar lists some differences between the REST and WebSocket APIs:

  • WebSocket uses HTTP only to establish the initial connection, whereas HTTP is the common protocol in RESTful web services.
  • WebSocket communication is based on the socket concept, whereas REST is based on the resources concept rather than commands.
  • WebSocket relies on an IP address and port number, whereas REST is based on the HTTP protocol and uses HTTP methods to relay data.
  • The cost of communication is lower for WebSocket than for REST.
  • WebSocket is better with high loads, whereas REST is more commonly employed for occasional communication.
  • WebSocket is a stateful protocol, whereas REST is based on HTTP, which is a stateless protocol.

The following diagram by Emmanuel Picard emphasizes the differences between REST and event-driven APIs (such as those based on WebSocket):

Rest versus event driven apis.jpg

FIX and related protocols

Few of the cryptocurrency exchanges use FIX and other specialized financial protocols. For completeness we mention them here.

The Financial Information eXchange (FIX) protocol is an electronic communications protocol initiated in 1992 for international real-time exchange of information related to securities transactions and markets. With trillions of dollars traded annually on the NASDAQ alone, financial service entities are employing direct market access (DMA) to increase their speed to financial markets. Managing the delivery of trading applications and keeping latency low increasingly requires an understanding of the FIX protocol.

The following diagram by Gareth Randall shows how FIX messaging looks between Buyside/Customer and Sellside/Supplier:

Fix system connectivity diagram.png

Other finance-specific protocols include FIX Adapted for STreaming (FAST), ITCH, and OUCH. See this page for more details.

Kdb+tick

Kdb+tick is an architecture which allows the capture, processing, and querying of real-time and historical data. It can be used to capture, store, and analyze massive volumes of data in real-time.

It is shipped in the form of a q library and consists of three q files:

  • tick.q — the tickerplant process script;
  • tick/r.q — the vanilla real-time subscriber process, known as the real-time database (rdb);
  • tick/u.q — loaded by tick.q, contains the definitions of the .u functions: .u.init, .u.del, .u.sel, .u.pub, .u.add, .u.sub, and .u.end.

Kdb+tick can be downloaded from https://github.com/KxSystems/kdb-tick.

Installation

To install kdb+tick, place these files under the QHOME directory (e.g. C:/q), so you end up with:

  • .../q
    • tick
      • r.q
      • u.q
    • w64 (or w32, or m32, or ..., depending on your OS and whether you are using 32-bit or 64-bit kdb+/q)
      • ...
    • q.k
    • q.q
    • README.txt
    • s.k
    • sp.q
    • tick.q
    • trade.q

Architecture

The architecture of a kdb+/q system is as follows. (The diagram is reproduced from Building real-time tick subscribers by Nathan Perrem, Kx:

Kdb tick architecture.png

The components are as follows:

  • The data feed — a stream of data updates from a data source, e.g. order book updates and trades from the BitMEX exchange.
  • The feedhandler (fh) — a process that captures the data from the data feed and feeds it into the tickerplant.
  • The tickerplant (tp) — a kdb+/q instance that publishes data records to the real-time database and other real-time subscribers. Constitutes the first element of the kdb+tick triplet.
  • The real-time database (rdb) — a kdb+/q instance that subscribes to data records on the tickerplant and stores them in an in-memory table. Today's data can be queried on the rdb. The rdb constitutes the second element of the kdb+tick triplet.
  • The historical database (hdb) — a kdb+/q instance that stores the data up to and including yesterday in a splayed and partitioned (by date) table on disc. Can be queried. Constitutes the third element of the kdb+tick triplet.
  • The real-time subscriber (rts) — a kdb+/q instance or another process that subscribes to data records on the tickerplant. This process may compute certain metrics, such as the volume weighted average price (VWAP), visualize the data, etc. Optional.
  • The binary log file contains the data for the current date written by the tp. Used to restore the rdb should the rdb process crash or require restarting for whatever reason.

The sequence of events

The sequence of events is as follows:

  • The tickerplant tick.q starts.
    • Load .q file that contains table schemas.
    • Load u.q that contains the definitions of the .u functions: .u.init, .u.del, .u.sel, .u.pub, .u.add, .u.sub, .u.end.
    • Define the tickerplant-specific functions:
      • .u.endofday — handles the end-of-day saving of the data and starting of the new log file.
      • .z.ts override — publishes updates to subscribers in a timed loop.
      • .u.upd — accepts data updates from feedhandlers, adding the time column where needed and buffering for later publishing.
    • .u.tick[]
      • Create an empty subscriber list.
      • Check that all tables have time/sym columns, apply the g# attribute.
      • Open the log file.
    • Wait for .upd updates to be received, buffer them, publish to subscribers on a timed loop.
  • The real-time database tick/r.q starts.
    • Define the rdb-specific functions:
      • .u.end — save the rdb contents to the hdb directory at end of day.
      • .u.rep — replay the log file, go to the hdb directory.
      • .u.upd — accept updates by inserting into the in-memory table.
    • Keep appending updates to in-memory tables until another command received.
    • Answer queries as and when received.
  • The historical database starts.
    • Load the directory as an on-disc database.
    • Answer queries as and when received.

The tickerplant

Two particularly important functions are defined on the tickerplant:

  • .u.upd[tableName;tableData] inserts tableData into the table specified by tableName. This function is normally called from a feedhandler. It takes the tableData, adds a time column if one is absent, inserts it into the in-memory table, appends to the log file, and increases the counter of log file records .u.i. For example:
.u.upd[`quote;(`AAPL`MSFT;99.12 22.39;100.12 23.40;29 31;25 35;"A","A";"N","N")]
  • .u.sub[tableName;listOfSyms] subscribes to the syms specified by listOfSyms in table tableName. Once this function has been called, the remote process, via the upd function, will receive any updates on tableName for the syms in listOfSyms. An empty symbol ` for any argument stands for "all": it subscribes to data for all tables and/or syms. For example:
h:hopen`:localhost:5010;
upd:{[x;y]0N!raze"received update on table ",string[x],":";0N!y};
h(".u.sub";`trade;`XBTUSD`EURUSD)
This will subscribe to updates on symbols `XBTUSD and `EURUSD in table `trade; to subscribe to updates on all symbols in that table, use
h(".u.sub";`trade;`)

The following important variables are defined on a tickerplant:

  • .u.t:`quote`trade — tables that exist
  • .u.d:2021.06.01 — the date
  • .u.l:636i — log file handle
  • .u.L:`:./bitmex2021.06.01 — path to log file
  • .u.i:98720 — number of messages in the log file
  • .u.w: contains a dictionary from table names to a list of lists of the form (handle;subscribed syms). For example:
quote| ,(680i;`)
trade| ((680i;`);(984i;`XBTUSD`EURUSD))
The example shows a subscribe from the rdb to the quote table for all syms (`) on socket handle 680i, a subscribe from the rdb to the trade table for all syms (`) on socket handle 680i, and a subscribe from a process on socket handle 984i to the trade table for syms `XBTUSD and `EURUSD.

Setting up kdb+tick

Create the file C:/q/tick/example.q (replacing C:/q with QHOME on your machine with the following contents:

quote:([]time:`timespan$(); sym:`g#`symbol$(); bid:`float$(); ask:`float$(); bsize:`long$(); asize:`long$(); mode:`char$(); ex:`char$())
trade:([]time:`timespan$(); sym:`g#`symbol$(); price:`float$(); size:`int$(); stop:`boolean$(); cond:`char$(); ex:`char$())

This file defines the schemas for the quote and trade tables.

Launching the rickerplant

To launch the tickerplant, go to the location of the q executable (on Windows, q.exe and launch it with the following command line:

cd C:/q/w64
q tick.q example . -p 5010

tick.q will be loaded from the QHOME directory (in our case C:/q. The command line tells the tickerplant to load the table schemas from tick/example.q (the example argument) and write the log file in the current directory (C:/q/w64, the . argument). This will start the tickerplant on port 5010 (the -p 5010 command line option).

Once the tickerplant is up and running, you should see the log file C:\q\w64\example2021.06.19 appear (the date in the file name will, of course, be the current date).

The tickerplant will not start unless the file with the table schemas, C:\q\tick\example.q, is in place. If it is missing, you may see the following error message:

'tick/example.q. OS reports: The system cannot find the file specified.
  [3]  \l tick/example.q
       ^

Launching the rdb

Make sure the tickerplant is running before you launch the rdb, since the rdb will attempt to connect to the tickerplant.

To launch the rdb, go to the location of the q executable (on Windows, q.exe), and launch it with the following command line:

cd C:/q/w64
q tick/r.q -p 5011

tick/r.q will be loaded from under the QHOME directory (in our case, C:/q). This will start the rdb on port 5011 (the -p 5011 command line option).

The rdb will attempt to connect to the tickerplant, which it looks for, by default, on port 5010. If the tickerplant is not found, the rdb will produce the following error message:

'hop. OS reports: No connection could be made because the target machine actively refused it.
  [2]  c:\q\tick\r.q:19: .u.rep .(hopen `$":",.u.x 0)"(.u.sub[`;`];`.u `i`L)";
                                  ^
  [0]  (<load>)

  )

If you launched the tickerplant on a different port, say on 5008, you need to specify it as the command line parameter to tick/r.q as follows:

q tick/r.q :5008 -p 5011

The full syntax for specifying the location of the tickerplant and the hdb, respectively, is

q tick/r.q [host]:port[:usr:pwd] [host]:port[:usr:pwd]

By default, tick/r.q looks for the tickerplant on localhost:5010 and for the hdb on localhost:5012.

The hdb is optional. It may or may not be running.

We will assume that the tickerplant is running on localhost:5010.

Inspecting the tickerplant and the rdb

If we now open http://localhost:5011/ in the browser, we will see that the rdb has been initialized with the schemas from C:/q/tick/sym.q, but the two tables are empty:

Empty rdb.png

We see the same empty tables if we examine the tickerplant on http://localhost:5010/.

Updating the tickerplant from kdb+/q

Now suppose that a trade and two quotes have arrived. We need to update the tickerplant. We run the following code on the tickerplant (either by connecting to the tickerplant from an IDE, such as Q Insight Pad, or by typing the code directly into the tickerplant's terminal window):

q).u.upd[`trade;(enlist`IBM;enlist 56.14;enlist 89i;enlist 0b;enlist"Z";enlist"N")]
q).u.upd[`quote;(`AAPL`MSFT;99.12 22.39;100.12 23.40;29 31;25 35;"AA";"NN")]

If we now inspect the rdb, http://localhost:5011/, from the browser, we'll see that the tables are no longer empty:

Nonempty rdb.png

On the tickerplant http://localhost:5010/, the two tables remain empty. This is because the tickerplant does not keep the data in its tables. It writes the data to the log file, C:/q/264/example2021.06.19, and publishes it onto the rdb. If the rdb is restarted (try it), the log file will be replayed, and the data will reappear in the rdb's tables. The log file preserves the integrity of the data, should the rdb crash.

A toy feedhandler

Normally, you would not be .u.upding the data manually onto the tickerplant. This is a job for the feedhandler. We can .u.upd the data from Python using the exxeleron qPython library. It can be installed using

pip install qpython

Let us write the following toy feedhandler, which doesn't, however, listen to any feeds:

import numpy as np

from qpython import qconnection
from qpython.qcollection import qlist
from qpython.qtype import QBOOL_LIST, QDOUBLE_LIST, QINT_LIST, QLONG_LIST, QSYMBOL_LIST

with qconnection.QConnection(host='localhost', port=5010) as q:
    q.sendSync('.u.upd', np.string_('trade'), [
               qlist(['IBM'], qtype=QSYMBOL_LIST),
               qlist([56.14], qtype=QDOUBLE_LIST),
               qlist([89], qtype=QINT_LIST),
               qlist([False], qtype=QBOOL_LIST),
               'Z',
               'N'])
    q.sendSync('.u.upd', np.string_('quote'), [
               qlist(['AAPL', 'MSFT'], qtype=QSYMBOL_LIST),
               qlist([99.12, 22.39], qtype=QDOUBLE_LIST),
               qlist([100.12, 23.40], qtype=QDOUBLE_LIST),
               qlist([29, 31], qtype=QLONG_LIST),
               qlist([25, 25], qtype=QLONG_LIST),
               'AA',
               'NN'])

Let's run it. We'll see new rows of data appear in the rdb (via the tickerplant, where these rows don't stay).

Launching the hdb

To launch the historical database (hdb), use the following command:

q example -p 5012

The hdb will look for the example directory (created by the rdb), in which it expects to find the on-disc tables.

The hdb will be started on port 5012 (the -p 5012 command line argument).

Note that the first data will appear in the hdb after midnight: today's data is kept in memory on the rdb, whereas yesterday's and earlier data is in the hdb.

The publish-subscribe pattern

The publish-subscribe is a messaging pattern wherein the senders of messages, called publishers, do not program the messages to be sent directly to specific receivers. Instead, the publishers categorize messages into classes (or topics) without knowledge of which receivers — called subscribers in this context, — if any, there may be, and publish these messages through an appropriate mechanism.

Subscribers express interest in (subscribe to) one or more topics, and only receive messages that are of interest, without knowledge of which publishers, if any, there are.

A close relative to a message queue, a message topic provides a lightweight mechanism to broadcast asynchronous event notifications, and endpoints that allow software components to connect to the topic in order to send and receive those messages.

Unlike message queues, which batch messages until they are retrieved, message topics transfer messages with no or very little queueing, and push them out immediately to all subscribers.

All components that subscribe to the topic will receive every message that is broadcast, unless a message filtering policy is set by the subscriber.

The following diagram from Amazon Web Services (AWS) illustrates the publish-subscribe pattern:

Publish subscribe.png

Kdb+/q supports the publish-subscribe pattern via kdb+tick. Kdb+/q is not alone in this. Technologies, such as

were designed primarily for message queueing use cases, whereas

were designed primarily to support publish-subscribe use cases. Other, often newer, solutions, such as Apache Pulsar, provide support for both message queueing and publish-subscribe.

Some of these technologies, notably Apache Kafka, can be adapted to work with kdb+/q (check https://github.com/KxSystems).

Subscribing to tickerplant updates from kdb+/q

Start a new instance. Evaluate the following code:

q)\c 1000 1000

This should ensure that the output doesn't get truncated and is readable in the console.

Open a socket connection to the tickerplant:

q)h:hopen`:localhost:5010

Let's define the following function:

q)upd:{[x;y]0N!raze"received update on table ",string[x],":";0N!y}
<pre>
This function will be called when updates arrive. Here we are simply outputting to the console the name of the table (<tt>x</tt>) on which the update has arrived and the update itself (<tt>y</tt>).

Now subscribe to updates on all syms (<tt>`</tt>) to table <tt>`trade</tt>:
<pre>
h(".u.sub";`trade;`)

We should then see the updates displayed by the function upd as we have defined it. Thus

q).u.upd[`trade;(enlist`IBM;enlist 56.14;enlist 89i;enlist 0b;enlist"Z";enlist"N")]
q).u.upd[`trade;(enlist`IBM;enlist 57.53;enlist 35i;enlist 0b;enlist"Z";enlist"N")]

on the tickerpant should result in

q)"received update on table trade:"
+`time`sym`price`size`stop`cond`ex!(,0D23:52:09.339540000;,`IBM;,56.14;,89i;,0b;,"Z";,"N")
"received update on table trade:"
+`time`sym`price`size`stop`cond`ex!(,0D23:54:04.053692000;,`IBM;,57.53;,35i;,0b;,"Z";,"N")

on our subscribing instance.

Subscribing to tickerplant updates from Python

Let us see how we can subscribe to tickerplant updates from Python using the exxeleron qPython library.

import numpy
import threading
import sys

from qpython import qconnection
from qpython.qtype import QException
from qpython.qconnection import MessageType
from qpython.qcollection import QTable

class ListenerThread(threading.Thread):
    def __init__(self, q):
        super(ListenerThread, self).__init__()
        self.q = q
        self._stopper = threading.Event()

    def stopit(self):
        self._stopper.set()

    def stopped(self):
        return self._stopper.is_set()

    def run(self):
        while not self.stopped():
            print('.')
            try:
                message = self.q.receive(data_only = False, raw = False) # retrieve entire message

                if message.type != MessageType.ASYNC:
                    print('Unexpected message, expected message of type: ASYNC')

                print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed))

                if isinstance(message.data, list):
                    # unpack upd message
                    if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable):
                        for row in message.data[2]:
                            print(row)

            except QException as e:
                print(e)

if __name__ == '__main__':
    with qconnection.QConnection(host = 'localhost', port = 5010) as q:
        print(q)
        print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
        print('Press <ENTER> to close application')

        # subscribe to tick
        print(dir(q))
        response = q.sendSync('.u.sub', numpy.string_(''), numpy.string_(''))
        # get table model
        if isinstance(response[1], QTable):
            print('%s table data model: %s' % (response[0], response[1].dtype))

        t = ListenerThread(q)
        t.start()

        sys.stdin.readline()

        t.stopit()

The BitMEX feedhandler

We will now write a real (rather than toy) feedhandler in Python for the BitMEX exchange using the WebSocket protocol.

First, we need to

pip install websocket_client

The websocket API is very straightforward and uses four callbacks: on_open(), on_close(), on_message(), and on_error().

We start with the following skeleton for our feedhandler:

import logging
import sys
import websocket

try:
    import thread
except ImportError:
    import _thread as thread

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)
    sys.exit(1)

def on_open(ws):
    print('Opened WebSocket connection')

def on_close(ws):
    print('Closed WebSocket connection')

if __name__ == "__main__":
    print('Opening WebSocket connection')
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=orderBook10:XBTUSD,trade:XBTUSD",
            on_message = on_message,
            on_error = on_error,
            on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

We use our skeleton feedhandler to sample some trade messages:

{"table":"trade","action":"insert","data":[{"timestamp":"2021-06-19T23:12:15.550Z","symbol":"XBTUSD","side":"Buy","size":5700,"price":35508.5,"tickDirection":"ZeroPlusTick","trdMatchID":"8c35aab7-2225-b52b-55f5-d824c2f240b5","grossValue":16052511,"homeNotional":0.16052511,"foreignNotional":5700}]}

and some order book update messages:

{"table":"orderBook10","action":"update","data":[{"symbol":"XBTUSD","bids":[[35579.5,212300],[35574,248000],[35573,65000],[35572,23500],[35571.5,176300],[35570.5,23500],[35570,83000],[35569.5,155700],[35569,10700],[35568.5,62500]],"timestamp":"2021-06-19T23:13:56.340Z","asks":[[35580,82300],[35583,6300],[35584,1500],[35584.5,400],[35586.5,100],[35588.5,600],[35589,2600],[35589.5,500],[35590,10000],[35590.5,209200]]}]}

Looking at these messages it is easy to come up with table schemas, which we save in C:\q\tick\bitmex.q:

quote:([]time:`timespan$(); sym:`g#`symbol$(); feedhandlerTime:`timestamp$(); exchangeTime:`timestamp$(); bid1:`float$(); bid2:`float$(); bid3:`float$(); bid4:`float$(); bid5:`float$(); bid6:`float$(); bid7:`float$(); bid8:`float$(); bid9:`float$(); bid10:`float$(); ask1:`float$(); ask2:`float$(); ask3:`float$(); ask4:`float$(); ask5:`float$(); ask6:`float$(); ask7:`float$(); ask8:`float$(); ask9:`float$(); ask10:`float$(); bidSize1:`float$(); bidSize2:`float$(); bidSize3:`float$(); bidSize4:`float$(); bidSize5:`float$(); bidSize6:`float$(); bidSize7:`float$(); bidSize8:`float$(); bidSize9:`float$(); bidSize10:`float$(); askSize1:`float$(); askSize2:`float$(); askSize3:`float$(); askSize4:`float$(); askSize5:`float$(); askSize6:`float$(); askSize7:`float$(); askSize8:`float$(); askSize9:`float$(); askSize10:`float$())
trade:([]time:`timespan$(); sym:`g#`symbol$(); feedhandlerTime:`timestamp$(); exchangeTime:`timestamp$(); side:`symbol$(); size:`float$(); price:`float$(); tickDirection:`symbol$(); trdMatchID:(); grossValue:`float$(); homeNotional:`float$(); foreignNotional:`float$())

We then launch the tickerplant as

q.exe tick.q bitmex . -p 5010

the rdb as

q tick/r.q -p 5011

and the hdb as

q bitmex -p 5012

(in that order).

We complete the implementation of our feedhandler as follows:

import argparse
import datetime as dt
import json
import logging
import os
import sys
import time
import websocket

try:
    import thread
except ImportError:
    import _thread as thread

q = None

last_message_datetime = None
message_count = 0

def on_message(ws, message):
    global last_message_datetime
    global message_count
    global q
    try:
        if q is not None:
            import numpy as np
            from qpython.qcollection import qlist
            from qpython.qtemporal import array_to_raw_qtemporal
            from qpython.qtype import QDOUBLE_LIST, QSTRING_LIST, QSYMBOL_LIST, QTIMESTAMP_LIST
            o = json.loads(message)
            if 'table' in o:
                if o['table'] == 'orderBook10':
                    for datum in o['data']:
                        exchange_timestamp = datum['timestamp']   # E.g. 2019-05-09T05:47:29.457Z
                        python_exchange_datetime = dt.datetime.strptime(exchange_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
                        symbol = datum['symbol']
                        data = [
                                qlist([symbol], qtype=QSYMBOL_LIST),
                                qlist([np.datetime64(dt.datetime.now(), 'ns')], qtype=QTIMESTAMP_LIST),
                                qlist([np.datetime64(python_exchange_datetime, 'ns')], qtype=QTIMESTAMP_LIST)
                            ]
                        bids = datum['bids']
                        asks = datum['asks']
                        for bididx, (bid, bid_size) in enumerate(bids):
                            data.append(qlist([bid], qtype=QDOUBLE_LIST))
                        for askidx, (ask, ask_size) in enumerate(asks):
                            data.append(qlist([ask], qtype=QDOUBLE_LIST))
                        for bididx, (bid, bid_size) in enumerate(bids):
                            data.append(qlist([bid_size], qtype=QDOUBLE_LIST))
                        for askidx, (ask, ask_size) in enumerate(asks):
                            data.append(qlist([ask_size], qtype=QDOUBLE_LIST))
                        q.sendSync('.u.upd', np.string_('quote'), data)
                elif o['table'] == 'trade':
                    for datum in o['data']:
                        exchange_timestamp = datum['timestamp']   # E.g. 2019-05-09T05:47:29.457Z
                        python_exchange_datetime = dt.datetime.strptime(exchange_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
                        symbol = datum['symbol']
                        side = datum['side']
                        size = datum['size']
                        price = datum['price']
                        tickDirection = datum['tickDirection']
                        trdMatchID = datum['trdMatchID']
                        grossValue = datum['grossValue']
                        homeNotional = datum['homeNotional']
                        foreignNotional = datum['foreignNotional']
                        q.sendSync('.u.upd', np.string_('trade'), [
                                qlist([symbol], qtype=QSYMBOL_LIST),
                                qlist([np.datetime64(dt.datetime.now(), 'ns')], qtype=QTIMESTAMP_LIST),
                                qlist([np.datetime64(python_exchange_datetime, 'ns')], qtype=QTIMESTAMP_LIST),
                                qlist([side], qtype=QSYMBOL_LIST),
                                qlist([size], qtype=QDOUBLE_LIST),
                                qlist([price], qtype=QDOUBLE_LIST),
                                qlist([np.string_(tickDirection)], qtype=QSYMBOL_LIST),
                                qlist([trdMatchID], qtype=QSTRING_LIST),
                                qlist([grossValue], qtype=QDOUBLE_LIST),
                                qlist([homeNotional], qtype=QDOUBLE_LIST),
                                qlist([foreignNotional], qtype=QDOUBLE_LIST)
                            ])
        last_message_datetime = dt.datetime.now()
        message_count += 1
    except Exception as e:
        logging.error(e)
        sys.exit(1)

def on_error(ws, error):
    logging.error(error)
    sys.exit(1)

def on_open(ws):
    logging.info('Opened WebSocket connection')
    def run(*args):
        global last_message_datetime
        global message_count
        while True:
            time.sleep(60)
            if last_message_datetime is None or (dt.datetime.now() - last_message_datetime).total_seconds() > 60.:
                print('!!! Have not seen any messages for one minute, exiting')
                ws.close()
                sys.exit()
            else:
                print(dt.datetime.now(), '...received %d messages...' % message_count)
    thread.start_new_thread(run, ())

def on_close(ws):
    print('Closed WebSocket connection')

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='BitMEX feedhandler')
    parser.add_argument('--host', help='kdb+/q tickerplant host', action='store', dest='host', default='localhost')
    parser.add_argument('--port', help='kdb+/q tickerplant port', action='store', dest='port', type=int, default=5010)
    parser.add_argument('--console-log-level', help='console log level: 10=DEBUG, 20=INFO, 30=WARN, 40=ERROR, 50=CRITICAL', action='store', dest='console_log_level', type=int, default=20)
    parser.add_argument('--file-log-level', help='file log level: 10=DEBUG, 20=INFO, 30=WARN, 40=ERROR, 50=CRITICAL', action='store', dest='file_log_level', type=int, default=20)
    parser.add_argument('--log-file', help='path to log file', action='store', dest='log_file', default=None)
    args = parser.parse_args()

    if args.log_file is not None:
        logging.basicConfig(
                filename='bitmex-feedhandler.log',
                level=args.file_log_level,
                format='%(asctime)s %(pathname)s:%(lineno)d %(levelname)s %(message)s'
            )
        console = logging.StreamHandler()
        console.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
        console.setFormatter(formatter)
        logging.getLogger('').addHandler(console)
    else:
        logging.basicConfig(
                level=args.console_log_level,
                format='%(asctime)s %(levelname)s %(message)s'
            )

    logging.info('Starting BitMEX Feedhandler')

    logging.debug('host: %s' % args.host)
    logging.debug('port: %s' % args.port)
    logging.debug('console-log-level: %s' % args.console_log_level)
    logging.debug('file-log-level: %s' % args.file_log_level)
    logging.debug('log-file: %s' % args.log_file)

    logging.info('Connecting to tickerplant on host=%s, port=%d' % (args.host, args.port))
    from qpython import qconnection
    q = qconnection.QConnection(host=args.host, port=args.port)
    q.open()

    logging.info('Opening WebSocket connection')
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=orderBook10:XBTUSD,trade:XBTUSD",
            on_message = on_message,
            on_error = on_error,
            on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()
<pre>

We are now ready to launch the feedhandler:
<pre>
python bitmex_feedhandler.py

Watch the tables on the rdb populate:

Bitmex rdb.png

Visualization tool

Finally, we prototype a (very basic) visualization tool for our data on the basis of the Python tickerplant subscriber that we have introduced above. Here is the code, which can be improved in many ways:

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np
import random
import sys
import threading
import time

from qpython import qconnection
from qpython.qtype import QException
from qpython.qconnection import MessageType
from qpython.qcollection import QTable

class ListenerThread(threading.Thread):
    def __init__(self, q, data_class):
        super(ListenerThread, self).__init__()
        self.q = q
        self._data_class = data_class
        self._stopper = threading.Event()

    def stopit(self):
        self._stopper.set()

    def stopped(self):
        return self._stopper.is_set()

    def run(self):
        while not self.stopped():
            print('.')
            try:
                message = self.q.receive(data_only = False, raw = False) # retrieve entire message

                if message.type != MessageType.ASYNC:
                    print('Unexpected message, expected message of type: ASYNC')

                print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed))

                if isinstance(message.data, list):
                    # unpack upd message
                    if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable):
                        for row in message.data[2]:
                            print(row)
                            print(row[2], row[6])
                            self._data_class.x_data.append(row[2])
                            self._data_class.y_data.append(row[6])

            except QException as e:
                print(e)

class DataClass():
    def __init__(self):
        self.x_data = []
        self.y_data = []

class PlotClass():
    def __init__(self, data_class):
        self._data_class = data_class
        self.line, = plt.plot([], [], 'o')
        self.ani = FuncAnimation(plt.gcf(), self.run, interval=1000, repeat=True)

    def run(self, i):  
        print("plotting data")
        self.line.set_data(self._data_class.x_data, self._data_class.y_data)
        self.line.axes.relim()
        self.line.axes.autoscale_view()

with qconnection.QConnection(host = 'localhost', port = 5010) as q:
    print(q)
    print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
    print('Press <ENTER> to close application')
    # subscribe to tick
    print(dir(q))
    response = q.sendSync('.u.sub', np.string_('trade'), np.string_(''))
    # get table model
    if isinstance(response[1], QTable):
        print('%s table data model: %s' % (response[0], response[1].dtype))
    data = DataClass()
    plotter = PlotClass(data)
    t = ListenerThread(q, data)
    t.start()
    plt.show()

And here is the result of running it for a few minutes: