Difference between revisions of "Programming/Kdb/Labs/Feedhandler and kdb tick environment"
(40 intermediate revisions by the same user not shown) | |||
Line 11: | Line 11: | ||
Here is a list of some major cryptocurrency exchanges: | Here is a list of some major cryptocurrency exchanges: | ||
* | * [https://www.binance.com/en Binance] | ||
* | * [https://binance-docs.github.io/apidocs/delivery/en/ Binance Delivery] | ||
* | * [https://www.binance.com/en/futures Binance Futures] | ||
* | * [https://www.binance.us/en Binance US] | ||
* | * [https://www.bitcoin.com/ Bitcoin.com] | ||
* | * [https://bitfinex.com/ Bitfinex] | ||
* | * [https://bitflyer.com/ bitFlyer] | ||
* | * [https://en.bithumb.com/ Bithumb] | ||
* | * [https://bitmax.io/ BitMax] | ||
* | * [https://www.bitmex.com/ BitMEX] | ||
* | * [https://www.bitstamp.net/ Bitstamp] | ||
* | * [https://global.bittrex.com/ Bittrex] | ||
* | * [https://www.blockchain.com/ Blockchain.com] | ||
* | * [https://www.bybit.com/ Bybit] | ||
* | * [https://www.coinbase.com/ Coinbase] | ||
* | * [https://www.deribit.com/ Deribit] | ||
* | * [https://www.exx.com/ EXX] | ||
* | * [https://ftx.com/ FTX] | ||
* | * [https://ftx.us/ FTX US] | ||
* | * [https://www.gate.io/ Gate.io] | ||
* | * [https://gemini.com/ Gemini] | ||
* | * [https://hitbtc.com/ HitBTC] | ||
* | * [https://www.hbg.com/ Huobi] | ||
* | * [https://www.huobi.com/en-us/markets/hb_dm/ Huobi DM] | ||
* Huobi Swap | * Huobi Swap | ||
* | * [https://www.kraken.com/ Kraken] | ||
* | * [https://futures.kraken.com/ Kraken Futures] | ||
* | * [https://www.kucoin.com/ KuCoin] | ||
* | * [http://okcoin.com/ OKCoin] | ||
* | * [https://www.okex.com/ OKEx] | ||
* | * [https://www.poloniex.com/ Poloniex] | ||
* | * [https://www.probit.com/ ProBit] | ||
* | * [https://sg.upbit.com/home Upbit] | ||
=APIs= | =APIs= | ||
Line 86: | Line 86: | ||
==REST versus WebSocket== | ==REST versus WebSocket== | ||
[https://www.educba.com/websocket-vs-rest/ Priya Pademkar] lists some differences between the REST and WebSocket APIs: | |||
* WebSocket uses HTTP only to establish the initial connection, whereas HTTP is the common protocol in RESTful web services. | * WebSocket uses HTTP only to establish the initial connection, whereas HTTP is the common protocol in RESTful web services. | ||
* WebSocket communication is based on the socket concept, whereas REST is based on the resources concept rather than commands. | * WebSocket communication is based on the socket concept, whereas REST is based on the resources concept rather than commands. | ||
Line 110: | Line 110: | ||
</center> | </center> | ||
Other finance-specific protocols include | Other finance-specific protocols include [https://en.wikipedia.org/wiki/FAST_protocol FIX Adapted for STreaming (FAST)], ITCH, and OUCH. See [https://en.wikipedia.org/wiki/List_of_electronic_trading_protocols this page] for more details. | ||
=Kdb+tick= | =Kdb+tick= | ||
Line 121: | Line 121: | ||
* <tt>tick/u.q</tt> — loaded by <tt>tick.q</tt>, contains the definitions of the <tt>.u</tt> functions: <tt>.u.init</tt>, <tt>.u.del</tt>, <tt>.u.sel</tt>, <tt>.u.pub</tt>, <tt>.u.add</tt>, <tt>.u.sub</tt>, and <tt>.u.end</tt>. | * <tt>tick/u.q</tt> — loaded by <tt>tick.q</tt>, contains the definitions of the <tt>.u</tt> functions: <tt>.u.init</tt>, <tt>.u.del</tt>, <tt>.u.sel</tt>, <tt>.u.pub</tt>, <tt>.u.add</tt>, <tt>.u.sub</tt>, and <tt>.u.end</tt>. | ||
Kdb+tick can be downloaded from | Kdb+tick can be downloaded from https://github.com/KxSystems/kdb-tick. | ||
==Installation== | ==Installation== | ||
Line 142: | Line 142: | ||
==Architecture== | ==Architecture== | ||
The architecture of a kdb+/q system is as follows. (The diagram is reproduced from | The architecture of a kdb+/q system is as follows. (The diagram is reproduced from [https://code.kx.com/q/wp/rt-tick/ Building real-time tick subscribers] by Nathan Perrem, Kx: | ||
<center> | <center> | ||
[[File:kdb_tick_architecture.png]] | [[File:kdb_tick_architecture.png]] | ||
</center> | </center> | ||
The components are as follows: | |||
* The '''data feed''' — a stream of data updates from a data source, e.g. order book updates and trades from the BitMEX exchange. | |||
* The '''feedhandler (fh)''' — a process that captures the data from the data feed and feeds it into the tickerplant. | |||
* The '''tickerplant (tp)''' — a kdb+/q instance that publishes data records to the real-time database and other real-time subscribers. Constitutes the first element of the '''kdb+tick triplet'''. | |||
* The '''real-time database (rdb)''' — a kdb+/q instance that subscribes to data records on the tickerplant and stores them in an in-memory table. Today's data can be queried on the rdb. The rdb constitutes the second element of the kdb+tick triplet. | |||
* The '''historical database (hdb)''' — a kdb+/q instance that stores the data up to and including yesterday in a splayed and partitioned (by date) table on disc. Can be queried. Constitutes the third element of the kdb+tick triplet. | |||
* The '''real-time subscriber (rts)''' — a kdb+/q instance or another process that subscribes to data records on the tickerplant. This process may compute certain metrics, such as the volume weighted average price (VWAP), visualize the data, etc. Optional. | |||
* The binary '''log file''' contains the data for the current date written by the tp. Used to restore the rdb should the rdb process crash or require restarting for whatever reason. | |||
==The sequence of events== | |||
The sequence of events is as follows: | |||
* The tickerplant <tt>tick.q</tt> starts. | |||
** Load <tt>.q</tt> file that contains table schemas. | |||
** Load <tt>u.q</tt> that contains the definitions of the <tt>.u</tt> functions: <tt>.u.init</tt>, <tt>.u.del</tt>, <tt>.u.sel</tt>, <tt>.u.pub</tt>, <tt>.u.add</tt>, <tt>.u.sub</tt>, <tt>.u.end</tt>. | |||
** Define the tickerplant-specific functions: | |||
*** <tt>.u.endofday</tt> — handles the end-of-day saving of the data and starting of the new log file. | |||
*** <tt>.z.ts</tt> override — publishes updates to subscribers in a timed loop. | |||
*** <tt>.u.upd</tt> — accepts data updates from feedhandlers, adding the time column where needed and buffering for later publishing. | |||
** <tt>.u.tick[]</tt> | |||
*** Create an empty subscriber list. | |||
*** Check that all tables have time/sym columns, apply the <tt>g#</tt> attribute. | |||
*** Open the log file. | |||
** Wait for <tt>.upd</tt> updates to be received, buffer them, publish to subscribers on a timed loop. | |||
* The real-time database <tt>tick/r.q</tt> starts. | |||
** Define the rdb-specific functions: | |||
*** <tt>.u.end</tt> — save the rdb contents to the hdb directory at end of day. | |||
*** <tt>.u.rep</tt> — replay the log file, go to the hdb directory. | |||
*** <tt>.u.upd</tt> — accept updates by inserting into the in-memory table. | |||
** Keep appending updates to in-memory tables until another command received. | |||
** Answer queries as and when received. | |||
* The historical database starts. | |||
** Load the directory as an on-disc database. | |||
** Answer queries as and when received. | |||
==The tickerplant== | |||
Two particularly important functions are defined on the tickerplant: | |||
* <tt>.u.upd[tableName;tableData]</tt> inserts <tt>tableData</tt> into the table specified by <tt>tableName</tt>. This function is normally called from a feedhandler. It takes the <tt>tableData</tt>, adds a time column if one is absent, inserts it into the in-memory table, appends to the log file, and increases the counter of log file records <tt>.u.i</tt>. For example: | |||
<pre> | |||
.u.upd[`quote;(`AAPL`MSFT;99.12 22.39;100.12 23.40;29 31;25 35;"A","A";"N","N")] | |||
</pre> | |||
* <tt>.u.sub[tableName;listOfSyms]</tt> subscribes to the syms specified by <tt>listOfSyms</tt> in table <tt>tableName</tt>. Once this function has been called, the remote process, via the <tt>upd</tt> function, will receive any updates on <tt>tableName</tt> for the syms in <tt>listOfSyms</tt>. An empty symbol <tt>`</tt> for any argument stands for "all": it subscribes to data for all tables and/or syms. For example: | |||
<pre> | |||
h:hopen`:localhost:5010; | |||
upd:{[x;y]0N!raze"received update on table ",string[x],":";0N!y}; | |||
h(".u.sub";`trade;`XBTUSD`EURUSD) | |||
</pre> | |||
: This will subscribe to updates on symbols <tt>`XBTUSD</tt> and <tt>`EURUSD</tt> in table <tt>`trade</tt>; to subscribe to updates on all symbols in that table, use | |||
<pre> | |||
h(".u.sub";`trade;`) | |||
</pre> | |||
The following important variables are defined on a tickerplant: | |||
* <tt>.u.t</tt>:`quote`trade — tables that exist | |||
* <tt>.u.d</tt>:2021.06.01 — the date | |||
* <tt>.u.l</tt>:636i — log file handle | |||
* <tt>.u.L</tt>:<tt>`:./bitmex2021.06.01</tt> — path to log file | |||
* <tt>.u.i</tt>:98720 — number of messages in the log file | |||
* <tt>.u.w</tt>: contains a dictionary from table names to a list of lists of the form <tt>(handle;subscribed syms)</tt>. For example: | |||
<pre> | |||
quote| ,(680i;`) | |||
trade| ((680i;`);(984i;`XBTUSD`EURUSD)) | |||
</pre> | |||
:The example shows a subscribe from the rdb to the <tt>quote</tt> table for all syms (<tt>`</tt>) on socket handle <tt>680i</tt>, a subscribe from the rdb to the <tt>trade</tt> table for all syms (<tt>`</tt>) on socket handle <tt>680i</tt>, and a subscribe from a process on socket handle <tt>984i</tt> to the <tt>trade</tt> table for syms <tt>`XBTUSD</tt> and <tt>`EURUSD</tt>. | |||
==Setting up kdb+tick== | |||
Create the file <tt>C:/q/tick/example.q</tt> (replacing <tt>C:/q</tt> with <tt>QHOME</tt> on your machine) with the following contents: | |||
<pre> | |||
quote:([]time:`timespan$(); sym:`g#`symbol$(); bid:`float$(); ask:`float$(); bsize:`long$(); asize:`long$(); mode:`char$(); ex:`char$()) | |||
trade:([]time:`timespan$(); sym:`g#`symbol$(); price:`float$(); size:`int$(); stop:`boolean$(); cond:`char$(); ex:`char$()) | |||
</pre> | |||
This file defines the schemas for the <tt>quote</tt> and <tt>trade</tt> tables. | |||
==Launching the tickerplant== | |||
To launch the tickerplant, go to the location of the q executable (on Windows, <tt>q.exe</tt> and launch it with the following command line: | |||
<pre> | |||
cd C:/q/w64 | |||
q tick.q example . -p 5010 | |||
</pre> | |||
On Linux and MacOS, you may need to run the above as | |||
<pre> | |||
./q tick.q example . -p 5010 | |||
</pre> | |||
<tt>tick.q</tt> will be loaded from the <tt>QHOME</tt> directory (in our case <tt>C:/q</tt>. The command line tells the tickerplant to load the table schemas from <tt>tick/example.q</tt> (the <tt>example</tt> argument) and write the log file in the current directory (<tt>C:/q/w64</tt>, the <tt>.</tt> argument). This will start the tickerplant on port 5010 (the <tt>-p 5010</tt> command line option). | |||
Once the tickerplant is up and running, you should see the log file <tt>C:\q\w64\example2021.06.19</tt> appear (the date in the file name will, of course, be the current date). | |||
The tickerplant will not start unless the file with the table schemas, <tt>C:\q\tick\example.q</tt>, is in place. If it is missing, you may see the following error message: | |||
<pre> | |||
'tick/example.q. OS reports: The system cannot find the file specified. | |||
[3] \l tick/example.q | |||
^ | |||
</pre> | |||
==Launching the rdb== | |||
Make sure the tickerplant is running ''before'' you launch the rdb, since the rdb will attempt to connect to the tickerplant. | |||
To launch the rdb, go to the location of the q executable (on Windows, <tt>q.exe</tt>), and launch it with the following command line: | |||
<pre> | |||
cd C:/q/w64 | |||
q tick/r.q -p 5011 | |||
</pre> | |||
<tt>tick/r.q</tt> will be loaded from under the <tt>QHOME</tt> directory (in our case, <tt>C:/q</tt>). This will start the rdb on port 5011 (the -p 5011 command line option). | |||
The rdb will attempt to connect to the tickerplant, which it looks for, by default, on port 5010. If the tickerplant is not found, the rdb will produce the following error message: | |||
<pre> | |||
'hop. OS reports: No connection could be made because the target machine actively refused it. | |||
[2] c:\q\tick\r.q:19: .u.rep .(hopen `$":",.u.x 0)"(.u.sub[`;`];`.u `i`L)"; | |||
^ | |||
[0] (<load>) | |||
) | |||
</pre> | |||
If you launched the tickerplant on a different port, say on 5008, you need to specify it as the command line parameter to <tt>tick/r.q</tt> as follows: | |||
<pre> | |||
q tick/r.q :5008 -p 5011 | |||
</pre> | |||
The full syntax for specifying the location of the tickerplant and the hdb, respectively, is | |||
<pre> | |||
q tick/r.q [host]:port[:usr:pwd] [host]:port[:usr:pwd] | |||
</pre> | |||
By default, <tt>tick/r.q</tt> looks for the tickerplant on <tt>localhost:5010</tt> and for the hdb on <tt>localhost:5012</tt>. | |||
The hdb is optional. It may or may not be running. | |||
We will assume that the tickerplant is running on <tt>localhost:5010</tt>. | |||
==Inspecting the tickerplant and the rdb== | |||
If we now open http://localhost:5011/ in the browser, we will see that the rdb has been initialized with the schemas from <tt>C:/q/tick/sym.q</tt>, but the two tables are empty: | |||
<center> | |||
[[File:empty_rdb.png]] | |||
</center> | |||
We see the same empty tables if we examine the tickerplant on http://localhost:5010/. | |||
==Updating the tickerplant from kdb+/q== | |||
Now suppose that a trade and two quotes have arrived. We need to update the tickerplant. We run the following code on the tickerplant (either by connecting to the tickerplant from an IDE, such as Q Insight Pad, or by typing the code directly into the tickerplant's terminal window): | |||
<pre> | |||
q).u.upd[`trade;(enlist`IBM;enlist 56.14;enlist 89i;enlist 0b;enlist"Z";enlist"N")] | |||
q).u.upd[`quote;(`AAPL`MSFT;99.12 22.39;100.12 23.40;29 31;25 35;"AA";"NN")] | |||
</pre> | |||
If we now inspect the rdb, http://localhost:5011/, from the browser, we'll see that the tables are no longer empty: | |||
<center> | |||
[[File:nonempty_rdb.png]] | |||
</center> | |||
On the tickerplant http://localhost:5010/, the two tables remain empty. This is because the tickerplant does not keep the data in its tables. It writes the data to the log file, <tt>C:/q/264/example2021.06.19</tt>, and publishes it onto the rdb. If the rdb is restarted (try it), the log file will be '''replayed''', and the data will reappear in the rdb's tables. The log file preserves the integrity of the data, should the rdb crash. | |||
==A toy feedhandler== | |||
Normally, you would not be <tt>.u.upd</tt>ing the data manually onto the tickerplant. This is a job for the '''feedhandler'''. We can <tt>.u.upd</tt> the data from Python using the exxeleron qPython library. It can be installed using | |||
<pre> | |||
pip install qpython | |||
</pre> | |||
Let us write the following toy feedhandler, which doesn't, however, listen to any feeds: | |||
<pre> | |||
import numpy as np | |||
from qpython import qconnection | |||
from qpython.qcollection import qlist | |||
from qpython.qtype import QBOOL_LIST, QDOUBLE_LIST, QINT_LIST, QLONG_LIST, QSYMBOL_LIST | |||
with qconnection.QConnection(host='localhost', port=5010) as q: | |||
q.sendSync('.u.upd', np.string_('trade'), [ | |||
qlist(['IBM'], qtype=QSYMBOL_LIST), | |||
qlist([56.14], qtype=QDOUBLE_LIST), | |||
qlist([89], qtype=QINT_LIST), | |||
qlist([False], qtype=QBOOL_LIST), | |||
'Z', | |||
'N']) | |||
q.sendSync('.u.upd', np.string_('quote'), [ | |||
qlist(['AAPL', 'MSFT'], qtype=QSYMBOL_LIST), | |||
qlist([99.12, 22.39], qtype=QDOUBLE_LIST), | |||
qlist([100.12, 23.40], qtype=QDOUBLE_LIST), | |||
qlist([29, 31], qtype=QLONG_LIST), | |||
qlist([25, 25], qtype=QLONG_LIST), | |||
'AA', | |||
'NN']) | |||
</pre> | |||
Let's run it. We'll see new rows of data appear in the rdb (via the tickerplant, where these rows don't stay). | |||
==Launching the hdb== | |||
To launch the historical database (hdb), use the following command: | |||
<pre> | |||
cd C:/q/w64 | |||
q example -p 5012 | |||
</pre> | |||
The hdb will look for the <tt>example</tt> directory (created by the rdb), in which it expects to find the on-disc tables. | |||
The hdb will be started on port 5012 (the <tt>-p 5012</tt> command line argument). | |||
Note that the first data will appear in the hdb after midnight: today's data is kept in memory on the rdb, whereas yesterday's and earlier data is in the hdb. | |||
=The publish-subscribe pattern= | |||
The '''publish-subscribe''' is a messaging pattern wherein the senders of messages, called '''publishers''', do not program the messages to be sent directly to specific receivers. Instead, the publishers categorize messages into classes (or '''topics''') without knowledge of which receivers — called '''subscribers''' in this context, — if any, there may be, and '''publish''' these messages through an appropriate mechanism. | |||
Subscribers express interest in ('''subscribe''' to) one or more topics, and only receive messages that are of interest, without knowledge of which publishers, if any, there are. | |||
A close relative to a '''message queue''', a message topic provides a lightweight mechanism to broadcast asynchronous event notifications, and endpoints that allow software components to connect to the topic in order to send and receive those messages. | |||
Unlike message queues, which batch messages until they are retrieved, message topics transfer messages with no or very little queueing, and push them out immediately to all subscribers. | |||
All components that subscribe to the topic will receive every message that is broadcast, unless a message '''filtering''' policy is set by the subscriber. | |||
The following diagram [https://aws.amazon.com/pub-sub-messaging/ from Amazon Web Services (AWS)] illustrates the publish-subscribe pattern: | |||
<center> | |||
[[File:publish_subscribe.png]] | |||
</center> | |||
Kdb+/q supports the publish-subscribe pattern via kdb+tick. Kdb+/q is not alone in this. Technologies, such as | |||
* [https://activemq.apache.org/components/classic/ Apache ActiveMQ™ 5 "Classic"]; | |||
* [https://activemq.apache.org/ Apache ActiveMQ™ Artemis]; | |||
* [https://aws.amazon.com/sqs/ Amazon Simple Queue Service (SQS)]; | |||
* [https://www.ibm.com/uk-en/products/mq IBM® Websphere® MQ]; | |||
* [https://www.rabbitmq.com/ RabbitMQ]; | |||
* [https://rocketmq.apache.org/ Apache RocketMQ™]; | |||
were designed primarily for message queueing use cases, whereas | |||
* [https://kafka.apache.org/ Apache Kafka®]; | |||
* [https://cloud.google.com/pubsub/docs/overview Google Cloud Pub/Sub]; | |||
* [https://www.tibco.com/products/tibco-rendezvous TIBCO Rendezvous®] | |||
were designed primarily to support publish-subscribe use cases. Other, often newer, solutions, such as Apache Pulsar, provide support for both message queueing and publish-subscribe. | |||
Some of these technologies, notably Apache Kafka, can be adapted to work with kdb+/q (check https://github.com/KxSystems). | |||
==Subscribing to tickerplant updates from kdb+/q== | |||
Start a new instance. Evaluate the following code: | |||
<pre> | |||
q)\c 1000 1000 | |||
</pre> | |||
This should ensure that the output doesn't get truncated and is readable in the console. | |||
Open a socket connection to the tickerplant: | |||
<pre> | |||
q)h:hopen`:localhost:5010 | |||
</pre> | |||
Let's define the following function: | |||
<pre> | |||
q)upd:{[x;y]0N!raze"received update on table ",string[x],":";0N!y} | |||
</pre> | |||
This function will be called when updates arrive. Here we are simply outputting to the console the name of the table (<tt>x</tt>) on which the update has arrived and the update itself (<tt>y</tt>). | |||
Now subscribe to updates on all syms (<tt>`</tt>) to table <tt>`trade</tt>: | |||
<pre> | |||
h(".u.sub";`trade;`) | |||
</pre> | |||
We should then see the updates displayed by the function <tt>upd</tt> as we have defined it. Thus | |||
<pre> | |||
q).u.upd[`trade;(enlist`IBM;enlist 56.14;enlist 89i;enlist 0b;enlist"Z";enlist"N")] | |||
q).u.upd[`trade;(enlist`IBM;enlist 57.53;enlist 35i;enlist 0b;enlist"Z";enlist"N")] | |||
</pre> | |||
on the tickerpant should result in | |||
<pre> | |||
q)"received update on table trade:" | |||
+`time`sym`price`size`stop`cond`ex!(,0D23:52:09.339540000;,`IBM;,56.14;,89i;,0b;,"Z";,"N") | |||
"received update on table trade:" | |||
+`time`sym`price`size`stop`cond`ex!(,0D23:54:04.053692000;,`IBM;,57.53;,35i;,0b;,"Z";,"N") | |||
</pre> | |||
on our subscribing instance. | |||
==Subscribing to tickerplant updates from Python== | |||
Let us see how we can subscribe to tickerplant updates from Python using the exxeleron qPython library. | |||
<pre> | |||
import numpy | |||
import threading | |||
import sys | |||
from qpython import qconnection | |||
from qpython.qtype import QException | |||
from qpython.qconnection import MessageType | |||
from qpython.qcollection import QTable | |||
class ListenerThread(threading.Thread): | |||
def __init__(self, q): | |||
super(ListenerThread, self).__init__() | |||
self.q = q | |||
self._stopper = threading.Event() | |||
def stopit(self): | |||
self._stopper.set() | |||
def stopped(self): | |||
return self._stopper.is_set() | |||
def run(self): | |||
while not self.stopped(): | |||
print('.') | |||
try: | |||
message = self.q.receive(data_only = False, raw = False) # retrieve entire message | |||
if message.type != MessageType.ASYNC: | |||
print('Unexpected message, expected message of type: ASYNC') | |||
print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed)) | |||
if isinstance(message.data, list): | |||
# unpack upd message | |||
if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable): | |||
for row in message.data[2]: | |||
print(row) | |||
except QException as e: | |||
print(e) | |||
if __name__ == '__main__': | |||
with qconnection.QConnection(host = 'localhost', port = 5010) as q: | |||
print(q) | |||
print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected())) | |||
print('Press <ENTER> to close application') | |||
# subscribe to tick | |||
print(dir(q)) | |||
response = q.sendSync('.u.sub', numpy.string_(''), numpy.string_('')) | |||
# get table model | |||
if isinstance(response[1], QTable): | |||
print('%s table data model: %s' % (response[0], response[1].dtype)) | |||
t = ListenerThread(q) | |||
t.start() | |||
sys.stdin.readline() | |||
t.stopit() | |||
</pre> | |||
=The BitMEX feedhandler= | |||
We will now write a real (rather than toy) feedhandler in Python for the BitMEX exchange using the WebSocket protocol. | |||
First, we need to | |||
<pre> | |||
pip install websocket_client | |||
</pre> | |||
The <tt>websocket</tt> API is very straightforward and uses four callbacks: <tt>on_open()</tt>, <tt>on_close()</tt>, <tt>on_message()</tt>, and <tt>on_error()</tt>. | |||
We start with the following skeleton for our feedhandler: | |||
<pre> | |||
import logging | |||
import sys | |||
import websocket | |||
try: | |||
import thread | |||
except ImportError: | |||
import _thread as thread | |||
def on_message(ws, message): | |||
print(message) | |||
def on_error(ws, error): | |||
print(error) | |||
sys.exit(1) | |||
def on_open(ws): | |||
print('Opened WebSocket connection') | |||
def on_close(ws): | |||
print('Closed WebSocket connection') | |||
if __name__ == "__main__": | |||
print('Opening WebSocket connection') | |||
websocket.enableTrace(True) | |||
ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=orderBook10:XBTUSD,trade:XBTUSD", | |||
on_message = on_message, | |||
on_error = on_error, | |||
on_close = on_close) | |||
ws.on_open = on_open | |||
ws.run_forever() | |||
</pre> | |||
We use our skeleton feedhandler to sample some trade messages: | |||
<pre> | |||
{"table":"trade","action":"insert","data":[{"timestamp":"2021-06-19T23:12:15.550Z","symbol":"XBTUSD","side":"Buy","size":5700,"price":35508.5,"tickDirection":"ZeroPlusTick","trdMatchID":"8c35aab7-2225-b52b-55f5-d824c2f240b5","grossValue":16052511,"homeNotional":0.16052511,"foreignNotional":5700}]} | |||
</pre> | |||
and some order book update messages: | |||
<pre> | |||
{"table":"orderBook10","action":"update","data":[{"symbol":"XBTUSD","bids":[[35579.5,212300],[35574,248000],[35573,65000],[35572,23500],[35571.5,176300],[35570.5,23500],[35570,83000],[35569.5,155700],[35569,10700],[35568.5,62500]],"timestamp":"2021-06-19T23:13:56.340Z","asks":[[35580,82300],[35583,6300],[35584,1500],[35584.5,400],[35586.5,100],[35588.5,600],[35589,2600],[35589.5,500],[35590,10000],[35590.5,209200]]}]} | |||
</pre> | |||
Looking at these messages it is easy to come up with table schemas, which we save in <tt>C:\q\tick\bitmex.q</tt>: | |||
<pre> | |||
quote:([]time:`timespan$(); sym:`g#`symbol$(); feedhandlerTime:`timestamp$(); exchangeTime:`timestamp$(); bid1:`float$(); bid2:`float$(); bid3:`float$(); bid4:`float$(); bid5:`float$(); bid6:`float$(); bid7:`float$(); bid8:`float$(); bid9:`float$(); bid10:`float$(); ask1:`float$(); ask2:`float$(); ask3:`float$(); ask4:`float$(); ask5:`float$(); ask6:`float$(); ask7:`float$(); ask8:`float$(); ask9:`float$(); ask10:`float$(); bidSize1:`float$(); bidSize2:`float$(); bidSize3:`float$(); bidSize4:`float$(); bidSize5:`float$(); bidSize6:`float$(); bidSize7:`float$(); bidSize8:`float$(); bidSize9:`float$(); bidSize10:`float$(); askSize1:`float$(); askSize2:`float$(); askSize3:`float$(); askSize4:`float$(); askSize5:`float$(); askSize6:`float$(); askSize7:`float$(); askSize8:`float$(); askSize9:`float$(); askSize10:`float$()) | |||
trade:([]time:`timespan$(); sym:`g#`symbol$(); feedhandlerTime:`timestamp$(); exchangeTime:`timestamp$(); side:`symbol$(); size:`float$(); price:`float$(); tickDirection:`symbol$(); trdMatchID:(); grossValue:`float$(); homeNotional:`float$(); foreignNotional:`float$()) | |||
</pre> | |||
We then launch the tickerplant as | |||
<pre> | |||
cd C:/q/w64 | |||
q.exe tick.q bitmex . -p 5010 | |||
</pre> | |||
the rdb (in another terminal) as | |||
<pre> | |||
cd C:/q/w64 | |||
q tick/r.q -p 5011 | |||
</pre> | |||
and the hdb (in another terminal) as | |||
<pre> | |||
cd C:/q/w64 | |||
q bitmex -p 5012 | |||
</pre> | |||
(in that order). | |||
We complete the implementation of our feedhandler as follows: | |||
<pre> | |||
import argparse | |||
import datetime as dt | |||
import json | |||
import logging | |||
import os | |||
import sys | |||
import time | |||
import websocket | |||
try: | |||
import thread | |||
except ImportError: | |||
import _thread as thread | |||
q = None | |||
last_message_datetime = None | |||
message_count = 0 | |||
def on_message(ws, message): | |||
global last_message_datetime | |||
global message_count | |||
global q | |||
try: | |||
if q is not None: | |||
import numpy as np | |||
from qpython.qcollection import qlist | |||
from qpython.qtemporal import array_to_raw_qtemporal | |||
from qpython.qtype import QDOUBLE_LIST, QSTRING_LIST, QSYMBOL_LIST, QTIMESTAMP_LIST | |||
o = json.loads(message) | |||
if 'table' in o: | |||
if o['table'] == 'orderBook10': | |||
for datum in o['data']: | |||
exchange_timestamp = datum['timestamp'] # E.g. 2019-05-09T05:47:29.457Z | |||
python_exchange_datetime = dt.datetime.strptime(exchange_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') | |||
symbol = datum['symbol'] | |||
data = [ | |||
qlist([symbol], qtype=QSYMBOL_LIST), | |||
qlist([np.datetime64(dt.datetime.now(), 'ns')], qtype=QTIMESTAMP_LIST), | |||
qlist([np.datetime64(python_exchange_datetime, 'ns')], qtype=QTIMESTAMP_LIST) | |||
] | |||
bids = datum['bids'] | |||
asks = datum['asks'] | |||
for bididx, (bid, bid_size) in enumerate(bids): | |||
data.append(qlist([bid], qtype=QDOUBLE_LIST)) | |||
for askidx, (ask, ask_size) in enumerate(asks): | |||
data.append(qlist([ask], qtype=QDOUBLE_LIST)) | |||
for bididx, (bid, bid_size) in enumerate(bids): | |||
data.append(qlist([bid_size], qtype=QDOUBLE_LIST)) | |||
for askidx, (ask, ask_size) in enumerate(asks): | |||
data.append(qlist([ask_size], qtype=QDOUBLE_LIST)) | |||
q.sendSync('.u.upd', np.string_('quote'), data) | |||
elif o['table'] == 'trade': | |||
for datum in o['data']: | |||
exchange_timestamp = datum['timestamp'] # E.g. 2019-05-09T05:47:29.457Z | |||
python_exchange_datetime = dt.datetime.strptime(exchange_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') | |||
symbol = datum['symbol'] | |||
side = datum['side'] | |||
size = datum['size'] | |||
price = datum['price'] | |||
tickDirection = datum['tickDirection'] | |||
trdMatchID = datum['trdMatchID'] | |||
grossValue = datum['grossValue'] | |||
homeNotional = datum['homeNotional'] | |||
foreignNotional = datum['foreignNotional'] | |||
q.sendSync('.u.upd', np.string_('trade'), [ | |||
qlist([symbol], qtype=QSYMBOL_LIST), | |||
qlist([np.datetime64(dt.datetime.now(), 'ns')], qtype=QTIMESTAMP_LIST), | |||
qlist([np.datetime64(python_exchange_datetime, 'ns')], qtype=QTIMESTAMP_LIST), | |||
qlist([side], qtype=QSYMBOL_LIST), | |||
qlist([size], qtype=QDOUBLE_LIST), | |||
qlist([price], qtype=QDOUBLE_LIST), | |||
qlist([np.string_(tickDirection)], qtype=QSYMBOL_LIST), | |||
qlist([trdMatchID], qtype=QSTRING_LIST), | |||
qlist([grossValue], qtype=QDOUBLE_LIST), | |||
qlist([homeNotional], qtype=QDOUBLE_LIST), | |||
qlist([foreignNotional], qtype=QDOUBLE_LIST) | |||
]) | |||
last_message_datetime = dt.datetime.now() | |||
message_count += 1 | |||
except Exception as e: | |||
logging.error(e) | |||
sys.exit(1) | |||
def on_error(ws, error): | |||
logging.error(error) | |||
sys.exit(1) | |||
def on_open(ws): | |||
logging.info('Opened WebSocket connection') | |||
def run(*args): | |||
global last_message_datetime | |||
global message_count | |||
while True: | |||
time.sleep(60) | |||
if last_message_datetime is None or (dt.datetime.now() - last_message_datetime).total_seconds() > 60.: | |||
print('!!! Have not seen any messages for one minute, exiting') | |||
ws.close() | |||
sys.exit() | |||
else: | |||
print(dt.datetime.now(), '...received %d messages...' % message_count) | |||
thread.start_new_thread(run, ()) | |||
def on_close(ws): | |||
print('Closed WebSocket connection') | |||
if __name__ == "__main__": | |||
parser = argparse.ArgumentParser(description='BitMEX feedhandler') | |||
parser.add_argument('--host', help='kdb+/q tickerplant host', action='store', dest='host', default='localhost') | |||
parser.add_argument('--port', help='kdb+/q tickerplant port', action='store', dest='port', type=int, default=5010) | |||
parser.add_argument('--console-log-level', help='console log level: 10=DEBUG, 20=INFO, 30=WARN, 40=ERROR, 50=CRITICAL', action='store', dest='console_log_level', type=int, default=20) | |||
parser.add_argument('--file-log-level', help='file log level: 10=DEBUG, 20=INFO, 30=WARN, 40=ERROR, 50=CRITICAL', action='store', dest='file_log_level', type=int, default=20) | |||
parser.add_argument('--log-file', help='path to log file', action='store', dest='log_file', default=None) | |||
args = parser.parse_args() | |||
if args.log_file is not None: | |||
logging.basicConfig( | |||
filename='bitmex-feedhandler.log', | |||
level=args.file_log_level, | |||
format='%(asctime)s %(pathname)s:%(lineno)d %(levelname)s %(message)s' | |||
) | |||
console = logging.StreamHandler() | |||
console.setLevel(logging.DEBUG) | |||
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |||
console.setFormatter(formatter) | |||
logging.getLogger('').addHandler(console) | |||
else: | |||
logging.basicConfig( | |||
level=args.console_log_level, | |||
format='%(asctime)s %(levelname)s %(message)s' | |||
) | |||
logging.info('Starting BitMEX Feedhandler') | |||
logging.debug('host: %s' % args.host) | |||
logging.debug('port: %s' % args.port) | |||
logging.debug('console-log-level: %s' % args.console_log_level) | |||
logging.debug('file-log-level: %s' % args.file_log_level) | |||
logging.debug('log-file: %s' % args.log_file) | |||
logging.info('Connecting to tickerplant on host=%s, port=%d' % (args.host, args.port)) | |||
from qpython import qconnection | |||
q = qconnection.QConnection(host=args.host, port=args.port) | |||
q.open() | |||
logging.info('Opening WebSocket connection') | |||
websocket.enableTrace(True) | |||
ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=orderBook10:XBTUSD,trade:XBTUSD", | |||
on_message = on_message, | |||
on_error = on_error, | |||
on_close = on_close) | |||
ws.on_open = on_open | |||
ws.run_forever() | |||
</pre> | |||
We are now ready to launch the feedhandler: | |||
<pre> | |||
python bitmex_feedhandler.py | |||
</pre> | |||
Watch the tables on the rdb populate: | |||
<center> | |||
[[File:bitmex_rdb.png]] | |||
</center> | |||
=Visualization tool= | |||
Finally, we prototype a (very basic) visualization tool for our data on the basis of the Python tickerplant subscriber that we have introduced above. Here is the code, which can be improved in many ways: | |||
<pre> | |||
import matplotlib.pyplot as plt | |||
from matplotlib.animation import FuncAnimation | |||
import numpy as np | |||
import random | |||
import sys | |||
import threading | |||
import time | |||
from qpython import qconnection | |||
from qpython.qtype import QException | |||
from qpython.qconnection import MessageType | |||
from qpython.qcollection import QTable | |||
class ListenerThread(threading.Thread): | |||
def __init__(self, q, data_class): | |||
super(ListenerThread, self).__init__() | |||
self.q = q | |||
self._data_class = data_class | |||
self._stopper = threading.Event() | |||
def stopit(self): | |||
self._stopper.set() | |||
def stopped(self): | |||
return self._stopper.is_set() | |||
def run(self): | |||
while not self.stopped(): | |||
print('.') | |||
try: | |||
message = self.q.receive(data_only = False, raw = False) # retrieve entire message | |||
if message.type != MessageType.ASYNC: | |||
print('Unexpected message, expected message of type: ASYNC') | |||
print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed)) | |||
if isinstance(message.data, list): | |||
# unpack upd message | |||
if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable): | |||
for row in message.data[2]: | |||
print(row) | |||
print(row[2], row[6]) | |||
self._data_class.x_data.append(row[2]) | |||
self._data_class.y_data.append(row[6]) | |||
except QException as e: | |||
print(e) | |||
class DataClass(): | |||
def __init__(self): | |||
self.x_data = [] | |||
self.y_data = [] | |||
class PlotClass(): | |||
def __init__(self, data_class): | |||
self._data_class = data_class | |||
self.line, = plt.plot([], [], 'o') | |||
self.ani = FuncAnimation(plt.gcf(), self.run, interval=1000, repeat=True) | |||
def run(self, i): | |||
print("plotting data") | |||
self.line.set_data(self._data_class.x_data, self._data_class.y_data) | |||
self.line.axes.relim() | |||
self.line.axes.autoscale_view() | |||
with qconnection.QConnection(host = 'localhost', port = 5010) as q: | |||
print(q) | |||
print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected())) | |||
print('Press <ENTER> to close application') | |||
# subscribe to tick | |||
print(dir(q)) | |||
response = q.sendSync('.u.sub', np.string_('trade'), np.string_('')) | |||
# get table model | |||
if isinstance(response[1], QTable): | |||
print('%s table data model: %s' % (response[0], response[1].dtype)) | |||
data = DataClass() | |||
plotter = PlotClass(data) | |||
t = ListenerThread(q, data) | |||
t.start() | |||
plt.show() | |||
</pre> | |||
And here is the result of running it for a few minutes: | |||
<center> | |||
[[File:visualizer.png]] | |||
</center> |
Latest revision as of 22:19, 21 December 2021
Cryptocurrency
A cryptocurrency, crypto-currency, or crypto is a digital asset designed to work as a medium of exchange wherein individual coin ownership records are stored in a ledger existing in a form of a computerized database using strong cryptography to secure transaction records, to control the creation of additional coins, and to verify the transfer of coin ownership. It typically does not exist in physical form (like paper money) and is typically not issued by a central authority. Cryptocurrencies typically use decentralized control as opposed to centralized digital currency and central banking systems. When a cryptocurrency is minted or created prior to issuance or issued by a single issuer, it is generally considered centralized. When implemented with decentralized control, each cryptocurrency works through distributed ledger technology, typically a blockchain, that serves as a public financial transaction database.
Bitcoin, first released as open-source software in 2009, is the first decentralized cryptocurrency. Since the release of bitcoin, other cryptocurrencies have been created.
Cryptocurrency exchanges
A cryptocurrency exchange, or a digital currency exchange (DCE), is a business that allows customers to trade cryptocurrencies or digital currencies for other assets, such as conventional fiat money or other digital currencies. Exchanges may accept credit card payements, wire transfers or other forms of payment in exchange for digital currencies or cryptocurrencies. A cryptocurrency exchange can be a market maker that typically takes the bid-ask spreads as a transaction commission for its service or, as a matching platform, simply charging fees.
Here is a list of some major cryptocurrency exchanges:
- Binance
- Binance Delivery
- Binance Futures
- Binance US
- Bitcoin.com
- Bitfinex
- bitFlyer
- Bithumb
- BitMax
- BitMEX
- Bitstamp
- Bittrex
- Blockchain.com
- Bybit
- Coinbase
- Deribit
- EXX
- FTX
- FTX US
- Gate.io
- Gemini
- HitBTC
- Huobi
- Huobi DM
- Huobi Swap
- Kraken
- Kraken Futures
- KuCoin
- OKCoin
- OKEx
- Poloniex
- ProBit
- Upbit
APIs
API is an acronym for Application Programming Interface. In general, an API is an interface that allows two unrelated systems to interact with each other. An API specification describes the way that each of the two systems must interact with the API itself (such as the language that must be used, the syntax of messages that are passed back and forth, how frequently the messages can be sent, etc.).
Specifically for cryptocurrency trading, an API enables the client to interact with the exchange programmatically (via software instead of a human interface), allowing the client to obtain real-time market data, make trades, and manage the user's account.
A few of the tasks that can be accomplished via an API:
- download historical market data;
- stream real-time market data;
- place and cancel trading orders;
- view account balances;
- download trading history;
- make funding transactions...
Cryptocurrency exchange APIs are typically implemented using REST or the WebSocket protocol.
REST
Representational state transfer (REST) is a software architectural style that was created to guide the design and development of the architecture for the World Wide Web. REST defines a set of constraints for how the architecture of an Internet-scale distributed hypermedia system, such as the Web, should behave. The REST architectural style emphasizes the scalability of interactions between components, uniform interfaces, independent deployment of components, and the creation of a layered architecture to facilitate caching components to reduce user-perceived latency, enforce security, and encapsulate legacy systems.
A web service that obeys the REST constraints is informally described as RESTful. Such a web service must provide its web resources in a textual representation and allow them to be read and modified with a stateless protocol and a predetermined set of operations. This approach allows the greatest interoperability between clients and servers in a long-lived Internet-scale environment which crosses organisational boundaries.
In a RESTful web service, requests made to a resource's URI elicit a response with a payload formatted in HTML, XML, JSON, or some other format. For example, the response can confirm that the resource state has been changed. The response can include hypertext links to related resources. The most common protocol for these requests and responses is HTTP. It provides operations (HTTP methods) such as GET, POST, PUT, and DELETE. By using a stateless protocol and standard operations, RESTful systems aim for fast performance, reliability, and the ability to grow by reusing components that can be managed and updated without affecting the system as a while, even while it is running.
WebSocket
WebSocket is a computer communications protocol, providing full-duplex communication channels over a single TCP connection. The WebSocket protocol was standardized by the IETF as RFC 6455 in 2011, and the WebSocket API in Web IDL is being standardized by the W3C.
WebSocket is distinct from HTTP. Both protocols are located at layer 7 in the OSI model and depend on TCP at layer 4. Although they are different, RFC 6455 states that WebSocket
is designed to work over HTTP ports 443 and 80 as well as to support HTTP proxies and intermediaries,
thus making it compatible with HTTP. To achieve compatibility, the WebSocket handshake uses the HTTP Upgrade header to change from the HTTP protocol to the WebSocket protocol.
Schematically the WebSocket connection can be represented as follows:
REST versus WebSocket
Priya Pademkar lists some differences between the REST and WebSocket APIs:
- WebSocket uses HTTP only to establish the initial connection, whereas HTTP is the common protocol in RESTful web services.
- WebSocket communication is based on the socket concept, whereas REST is based on the resources concept rather than commands.
- WebSocket relies on an IP address and port number, whereas REST is based on the HTTP protocol and uses HTTP methods to relay data.
- The cost of communication is lower for WebSocket than for REST.
- WebSocket is better with high loads, whereas REST is more commonly employed for occasional communication.
- WebSocket is a stateful protocol, whereas REST is based on HTTP, which is a stateless protocol.
The following diagram by Emmanuel Picard emphasizes the differences between REST and event-driven APIs (such as those based on WebSocket):
Few of the cryptocurrency exchanges use FIX and other specialized financial protocols. For completeness we mention them here.
The Financial Information eXchange (FIX) protocol is an electronic communications protocol initiated in 1992 for international real-time exchange of information related to securities transactions and markets. With trillions of dollars traded annually on the NASDAQ alone, financial service entities are employing direct market access (DMA) to increase their speed to financial markets. Managing the delivery of trading applications and keeping latency low increasingly requires an understanding of the FIX protocol.
The following diagram by Gareth Randall shows how FIX messaging looks between Buyside/Customer and Sellside/Supplier:
Other finance-specific protocols include FIX Adapted for STreaming (FAST), ITCH, and OUCH. See this page for more details.
Kdb+tick
Kdb+tick is an architecture which allows the capture, processing, and querying of real-time and historical data. It can be used to capture, store, and analyze massive volumes of data in real-time.
It is shipped in the form of a q library and consists of three q files:
- tick.q — the tickerplant process script;
- tick/r.q — the vanilla real-time subscriber process, known as the real-time database (rdb);
- tick/u.q — loaded by tick.q, contains the definitions of the .u functions: .u.init, .u.del, .u.sel, .u.pub, .u.add, .u.sub, and .u.end.
Kdb+tick can be downloaded from https://github.com/KxSystems/kdb-tick.
Installation
To install kdb+tick, place these files under the QHOME directory (e.g. C:/q), so you end up with:
- .../q
- tick
- r.q
- u.q
- w64 (or w32, or m32, or ..., depending on your OS and whether you are using 32-bit or 64-bit kdb+/q)
- ...
- q.k
- q.q
- README.txt
- s.k
- sp.q
- tick.q
- trade.q
- tick
Architecture
The architecture of a kdb+/q system is as follows. (The diagram is reproduced from Building real-time tick subscribers by Nathan Perrem, Kx:
The components are as follows:
- The data feed — a stream of data updates from a data source, e.g. order book updates and trades from the BitMEX exchange.
- The feedhandler (fh) — a process that captures the data from the data feed and feeds it into the tickerplant.
- The tickerplant (tp) — a kdb+/q instance that publishes data records to the real-time database and other real-time subscribers. Constitutes the first element of the kdb+tick triplet.
- The real-time database (rdb) — a kdb+/q instance that subscribes to data records on the tickerplant and stores them in an in-memory table. Today's data can be queried on the rdb. The rdb constitutes the second element of the kdb+tick triplet.
- The historical database (hdb) — a kdb+/q instance that stores the data up to and including yesterday in a splayed and partitioned (by date) table on disc. Can be queried. Constitutes the third element of the kdb+tick triplet.
- The real-time subscriber (rts) — a kdb+/q instance or another process that subscribes to data records on the tickerplant. This process may compute certain metrics, such as the volume weighted average price (VWAP), visualize the data, etc. Optional.
- The binary log file contains the data for the current date written by the tp. Used to restore the rdb should the rdb process crash or require restarting for whatever reason.
The sequence of events
The sequence of events is as follows:
- The tickerplant tick.q starts.
- Load .q file that contains table schemas.
- Load u.q that contains the definitions of the .u functions: .u.init, .u.del, .u.sel, .u.pub, .u.add, .u.sub, .u.end.
- Define the tickerplant-specific functions:
- .u.endofday — handles the end-of-day saving of the data and starting of the new log file.
- .z.ts override — publishes updates to subscribers in a timed loop.
- .u.upd — accepts data updates from feedhandlers, adding the time column where needed and buffering for later publishing.
- .u.tick[]
- Create an empty subscriber list.
- Check that all tables have time/sym columns, apply the g# attribute.
- Open the log file.
- Wait for .upd updates to be received, buffer them, publish to subscribers on a timed loop.
- The real-time database tick/r.q starts.
- Define the rdb-specific functions:
- .u.end — save the rdb contents to the hdb directory at end of day.
- .u.rep — replay the log file, go to the hdb directory.
- .u.upd — accept updates by inserting into the in-memory table.
- Keep appending updates to in-memory tables until another command received.
- Answer queries as and when received.
- Define the rdb-specific functions:
- The historical database starts.
- Load the directory as an on-disc database.
- Answer queries as and when received.
The tickerplant
Two particularly important functions are defined on the tickerplant:
- .u.upd[tableName;tableData] inserts tableData into the table specified by tableName. This function is normally called from a feedhandler. It takes the tableData, adds a time column if one is absent, inserts it into the in-memory table, appends to the log file, and increases the counter of log file records .u.i. For example:
.u.upd[`quote;(`AAPL`MSFT;99.12 22.39;100.12 23.40;29 31;25 35;"A","A";"N","N")]
- .u.sub[tableName;listOfSyms] subscribes to the syms specified by listOfSyms in table tableName. Once this function has been called, the remote process, via the upd function, will receive any updates on tableName for the syms in listOfSyms. An empty symbol ` for any argument stands for "all": it subscribes to data for all tables and/or syms. For example:
h:hopen`:localhost:5010; upd:{[x;y]0N!raze"received update on table ",string[x],":";0N!y}; h(".u.sub";`trade;`XBTUSD`EURUSD)
- This will subscribe to updates on symbols `XBTUSD and `EURUSD in table `trade; to subscribe to updates on all symbols in that table, use
h(".u.sub";`trade;`)
The following important variables are defined on a tickerplant:
- .u.t:`quote`trade — tables that exist
- .u.d:2021.06.01 — the date
- .u.l:636i — log file handle
- .u.L:`:./bitmex2021.06.01 — path to log file
- .u.i:98720 — number of messages in the log file
- .u.w: contains a dictionary from table names to a list of lists of the form (handle;subscribed syms). For example:
quote| ,(680i;`) trade| ((680i;`);(984i;`XBTUSD`EURUSD))
- The example shows a subscribe from the rdb to the quote table for all syms (`) on socket handle 680i, a subscribe from the rdb to the trade table for all syms (`) on socket handle 680i, and a subscribe from a process on socket handle 984i to the trade table for syms `XBTUSD and `EURUSD.
Setting up kdb+tick
Create the file C:/q/tick/example.q (replacing C:/q with QHOME on your machine) with the following contents:
quote:([]time:`timespan$(); sym:`g#`symbol$(); bid:`float$(); ask:`float$(); bsize:`long$(); asize:`long$(); mode:`char$(); ex:`char$()) trade:([]time:`timespan$(); sym:`g#`symbol$(); price:`float$(); size:`int$(); stop:`boolean$(); cond:`char$(); ex:`char$())
This file defines the schemas for the quote and trade tables.
Launching the tickerplant
To launch the tickerplant, go to the location of the q executable (on Windows, q.exe and launch it with the following command line:
cd C:/q/w64 q tick.q example . -p 5010
On Linux and MacOS, you may need to run the above as
./q tick.q example . -p 5010
tick.q will be loaded from the QHOME directory (in our case C:/q. The command line tells the tickerplant to load the table schemas from tick/example.q (the example argument) and write the log file in the current directory (C:/q/w64, the . argument). This will start the tickerplant on port 5010 (the -p 5010 command line option).
Once the tickerplant is up and running, you should see the log file C:\q\w64\example2021.06.19 appear (the date in the file name will, of course, be the current date).
The tickerplant will not start unless the file with the table schemas, C:\q\tick\example.q, is in place. If it is missing, you may see the following error message:
'tick/example.q. OS reports: The system cannot find the file specified. [3] \l tick/example.q ^
Launching the rdb
Make sure the tickerplant is running before you launch the rdb, since the rdb will attempt to connect to the tickerplant.
To launch the rdb, go to the location of the q executable (on Windows, q.exe), and launch it with the following command line:
cd C:/q/w64 q tick/r.q -p 5011
tick/r.q will be loaded from under the QHOME directory (in our case, C:/q). This will start the rdb on port 5011 (the -p 5011 command line option).
The rdb will attempt to connect to the tickerplant, which it looks for, by default, on port 5010. If the tickerplant is not found, the rdb will produce the following error message:
'hop. OS reports: No connection could be made because the target machine actively refused it. [2] c:\q\tick\r.q:19: .u.rep .(hopen `$":",.u.x 0)"(.u.sub[`;`];`.u `i`L)"; ^ [0] (<load>) )
If you launched the tickerplant on a different port, say on 5008, you need to specify it as the command line parameter to tick/r.q as follows:
q tick/r.q :5008 -p 5011
The full syntax for specifying the location of the tickerplant and the hdb, respectively, is
q tick/r.q [host]:port[:usr:pwd] [host]:port[:usr:pwd]
By default, tick/r.q looks for the tickerplant on localhost:5010 and for the hdb on localhost:5012.
The hdb is optional. It may or may not be running.
We will assume that the tickerplant is running on localhost:5010.
Inspecting the tickerplant and the rdb
If we now open http://localhost:5011/ in the browser, we will see that the rdb has been initialized with the schemas from C:/q/tick/sym.q, but the two tables are empty:
We see the same empty tables if we examine the tickerplant on http://localhost:5010/.
Updating the tickerplant from kdb+/q
Now suppose that a trade and two quotes have arrived. We need to update the tickerplant. We run the following code on the tickerplant (either by connecting to the tickerplant from an IDE, such as Q Insight Pad, or by typing the code directly into the tickerplant's terminal window):
q).u.upd[`trade;(enlist`IBM;enlist 56.14;enlist 89i;enlist 0b;enlist"Z";enlist"N")] q).u.upd[`quote;(`AAPL`MSFT;99.12 22.39;100.12 23.40;29 31;25 35;"AA";"NN")]
If we now inspect the rdb, http://localhost:5011/, from the browser, we'll see that the tables are no longer empty:
On the tickerplant http://localhost:5010/, the two tables remain empty. This is because the tickerplant does not keep the data in its tables. It writes the data to the log file, C:/q/264/example2021.06.19, and publishes it onto the rdb. If the rdb is restarted (try it), the log file will be replayed, and the data will reappear in the rdb's tables. The log file preserves the integrity of the data, should the rdb crash.
A toy feedhandler
Normally, you would not be .u.upding the data manually onto the tickerplant. This is a job for the feedhandler. We can .u.upd the data from Python using the exxeleron qPython library. It can be installed using
pip install qpython
Let us write the following toy feedhandler, which doesn't, however, listen to any feeds:
import numpy as np from qpython import qconnection from qpython.qcollection import qlist from qpython.qtype import QBOOL_LIST, QDOUBLE_LIST, QINT_LIST, QLONG_LIST, QSYMBOL_LIST with qconnection.QConnection(host='localhost', port=5010) as q: q.sendSync('.u.upd', np.string_('trade'), [ qlist(['IBM'], qtype=QSYMBOL_LIST), qlist([56.14], qtype=QDOUBLE_LIST), qlist([89], qtype=QINT_LIST), qlist([False], qtype=QBOOL_LIST), 'Z', 'N']) q.sendSync('.u.upd', np.string_('quote'), [ qlist(['AAPL', 'MSFT'], qtype=QSYMBOL_LIST), qlist([99.12, 22.39], qtype=QDOUBLE_LIST), qlist([100.12, 23.40], qtype=QDOUBLE_LIST), qlist([29, 31], qtype=QLONG_LIST), qlist([25, 25], qtype=QLONG_LIST), 'AA', 'NN'])
Let's run it. We'll see new rows of data appear in the rdb (via the tickerplant, where these rows don't stay).
Launching the hdb
To launch the historical database (hdb), use the following command:
cd C:/q/w64 q example -p 5012
The hdb will look for the example directory (created by the rdb), in which it expects to find the on-disc tables.
The hdb will be started on port 5012 (the -p 5012 command line argument).
Note that the first data will appear in the hdb after midnight: today's data is kept in memory on the rdb, whereas yesterday's and earlier data is in the hdb.
The publish-subscribe pattern
The publish-subscribe is a messaging pattern wherein the senders of messages, called publishers, do not program the messages to be sent directly to specific receivers. Instead, the publishers categorize messages into classes (or topics) without knowledge of which receivers — called subscribers in this context, — if any, there may be, and publish these messages through an appropriate mechanism.
Subscribers express interest in (subscribe to) one or more topics, and only receive messages that are of interest, without knowledge of which publishers, if any, there are.
A close relative to a message queue, a message topic provides a lightweight mechanism to broadcast asynchronous event notifications, and endpoints that allow software components to connect to the topic in order to send and receive those messages.
Unlike message queues, which batch messages until they are retrieved, message topics transfer messages with no or very little queueing, and push them out immediately to all subscribers.
All components that subscribe to the topic will receive every message that is broadcast, unless a message filtering policy is set by the subscriber.
The following diagram from Amazon Web Services (AWS) illustrates the publish-subscribe pattern:
Kdb+/q supports the publish-subscribe pattern via kdb+tick. Kdb+/q is not alone in this. Technologies, such as
- Apache ActiveMQ™ 5 "Classic";
- Apache ActiveMQ™ Artemis;
- Amazon Simple Queue Service (SQS);
- IBM® Websphere® MQ;
- RabbitMQ;
- Apache RocketMQ™;
were designed primarily for message queueing use cases, whereas
were designed primarily to support publish-subscribe use cases. Other, often newer, solutions, such as Apache Pulsar, provide support for both message queueing and publish-subscribe.
Some of these technologies, notably Apache Kafka, can be adapted to work with kdb+/q (check https://github.com/KxSystems).
Subscribing to tickerplant updates from kdb+/q
Start a new instance. Evaluate the following code:
q)\c 1000 1000
This should ensure that the output doesn't get truncated and is readable in the console.
Open a socket connection to the tickerplant:
q)h:hopen`:localhost:5010
Let's define the following function:
q)upd:{[x;y]0N!raze"received update on table ",string[x],":";0N!y}
This function will be called when updates arrive. Here we are simply outputting to the console the name of the table (x) on which the update has arrived and the update itself (y).
Now subscribe to updates on all syms (`) to table `trade:
h(".u.sub";`trade;`)
We should then see the updates displayed by the function upd as we have defined it. Thus
q).u.upd[`trade;(enlist`IBM;enlist 56.14;enlist 89i;enlist 0b;enlist"Z";enlist"N")] q).u.upd[`trade;(enlist`IBM;enlist 57.53;enlist 35i;enlist 0b;enlist"Z";enlist"N")]
on the tickerpant should result in
q)"received update on table trade:" +`time`sym`price`size`stop`cond`ex!(,0D23:52:09.339540000;,`IBM;,56.14;,89i;,0b;,"Z";,"N") "received update on table trade:" +`time`sym`price`size`stop`cond`ex!(,0D23:54:04.053692000;,`IBM;,57.53;,35i;,0b;,"Z";,"N")
on our subscribing instance.
Subscribing to tickerplant updates from Python
Let us see how we can subscribe to tickerplant updates from Python using the exxeleron qPython library.
import numpy import threading import sys from qpython import qconnection from qpython.qtype import QException from qpython.qconnection import MessageType from qpython.qcollection import QTable class ListenerThread(threading.Thread): def __init__(self, q): super(ListenerThread, self).__init__() self.q = q self._stopper = threading.Event() def stopit(self): self._stopper.set() def stopped(self): return self._stopper.is_set() def run(self): while not self.stopped(): print('.') try: message = self.q.receive(data_only = False, raw = False) # retrieve entire message if message.type != MessageType.ASYNC: print('Unexpected message, expected message of type: ASYNC') print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed)) if isinstance(message.data, list): # unpack upd message if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable): for row in message.data[2]: print(row) except QException as e: print(e) if __name__ == '__main__': with qconnection.QConnection(host = 'localhost', port = 5010) as q: print(q) print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected())) print('Press <ENTER> to close application') # subscribe to tick print(dir(q)) response = q.sendSync('.u.sub', numpy.string_(''), numpy.string_('')) # get table model if isinstance(response[1], QTable): print('%s table data model: %s' % (response[0], response[1].dtype)) t = ListenerThread(q) t.start() sys.stdin.readline() t.stopit()
The BitMEX feedhandler
We will now write a real (rather than toy) feedhandler in Python for the BitMEX exchange using the WebSocket protocol.
First, we need to
pip install websocket_client
The websocket API is very straightforward and uses four callbacks: on_open(), on_close(), on_message(), and on_error().
We start with the following skeleton for our feedhandler:
import logging import sys import websocket try: import thread except ImportError: import _thread as thread def on_message(ws, message): print(message) def on_error(ws, error): print(error) sys.exit(1) def on_open(ws): print('Opened WebSocket connection') def on_close(ws): print('Closed WebSocket connection') if __name__ == "__main__": print('Opening WebSocket connection') websocket.enableTrace(True) ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=orderBook10:XBTUSD,trade:XBTUSD", on_message = on_message, on_error = on_error, on_close = on_close) ws.on_open = on_open ws.run_forever()
We use our skeleton feedhandler to sample some trade messages:
{"table":"trade","action":"insert","data":[{"timestamp":"2021-06-19T23:12:15.550Z","symbol":"XBTUSD","side":"Buy","size":5700,"price":35508.5,"tickDirection":"ZeroPlusTick","trdMatchID":"8c35aab7-2225-b52b-55f5-d824c2f240b5","grossValue":16052511,"homeNotional":0.16052511,"foreignNotional":5700}]}
and some order book update messages:
{"table":"orderBook10","action":"update","data":[{"symbol":"XBTUSD","bids":[[35579.5,212300],[35574,248000],[35573,65000],[35572,23500],[35571.5,176300],[35570.5,23500],[35570,83000],[35569.5,155700],[35569,10700],[35568.5,62500]],"timestamp":"2021-06-19T23:13:56.340Z","asks":[[35580,82300],[35583,6300],[35584,1500],[35584.5,400],[35586.5,100],[35588.5,600],[35589,2600],[35589.5,500],[35590,10000],[35590.5,209200]]}]}
Looking at these messages it is easy to come up with table schemas, which we save in C:\q\tick\bitmex.q:
quote:([]time:`timespan$(); sym:`g#`symbol$(); feedhandlerTime:`timestamp$(); exchangeTime:`timestamp$(); bid1:`float$(); bid2:`float$(); bid3:`float$(); bid4:`float$(); bid5:`float$(); bid6:`float$(); bid7:`float$(); bid8:`float$(); bid9:`float$(); bid10:`float$(); ask1:`float$(); ask2:`float$(); ask3:`float$(); ask4:`float$(); ask5:`float$(); ask6:`float$(); ask7:`float$(); ask8:`float$(); ask9:`float$(); ask10:`float$(); bidSize1:`float$(); bidSize2:`float$(); bidSize3:`float$(); bidSize4:`float$(); bidSize5:`float$(); bidSize6:`float$(); bidSize7:`float$(); bidSize8:`float$(); bidSize9:`float$(); bidSize10:`float$(); askSize1:`float$(); askSize2:`float$(); askSize3:`float$(); askSize4:`float$(); askSize5:`float$(); askSize6:`float$(); askSize7:`float$(); askSize8:`float$(); askSize9:`float$(); askSize10:`float$()) trade:([]time:`timespan$(); sym:`g#`symbol$(); feedhandlerTime:`timestamp$(); exchangeTime:`timestamp$(); side:`symbol$(); size:`float$(); price:`float$(); tickDirection:`symbol$(); trdMatchID:(); grossValue:`float$(); homeNotional:`float$(); foreignNotional:`float$())
We then launch the tickerplant as
cd C:/q/w64 q.exe tick.q bitmex . -p 5010
the rdb (in another terminal) as
cd C:/q/w64 q tick/r.q -p 5011
and the hdb (in another terminal) as
cd C:/q/w64 q bitmex -p 5012
(in that order).
We complete the implementation of our feedhandler as follows:
import argparse import datetime as dt import json import logging import os import sys import time import websocket try: import thread except ImportError: import _thread as thread q = None last_message_datetime = None message_count = 0 def on_message(ws, message): global last_message_datetime global message_count global q try: if q is not None: import numpy as np from qpython.qcollection import qlist from qpython.qtemporal import array_to_raw_qtemporal from qpython.qtype import QDOUBLE_LIST, QSTRING_LIST, QSYMBOL_LIST, QTIMESTAMP_LIST o = json.loads(message) if 'table' in o: if o['table'] == 'orderBook10': for datum in o['data']: exchange_timestamp = datum['timestamp'] # E.g. 2019-05-09T05:47:29.457Z python_exchange_datetime = dt.datetime.strptime(exchange_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') symbol = datum['symbol'] data = [ qlist([symbol], qtype=QSYMBOL_LIST), qlist([np.datetime64(dt.datetime.now(), 'ns')], qtype=QTIMESTAMP_LIST), qlist([np.datetime64(python_exchange_datetime, 'ns')], qtype=QTIMESTAMP_LIST) ] bids = datum['bids'] asks = datum['asks'] for bididx, (bid, bid_size) in enumerate(bids): data.append(qlist([bid], qtype=QDOUBLE_LIST)) for askidx, (ask, ask_size) in enumerate(asks): data.append(qlist([ask], qtype=QDOUBLE_LIST)) for bididx, (bid, bid_size) in enumerate(bids): data.append(qlist([bid_size], qtype=QDOUBLE_LIST)) for askidx, (ask, ask_size) in enumerate(asks): data.append(qlist([ask_size], qtype=QDOUBLE_LIST)) q.sendSync('.u.upd', np.string_('quote'), data) elif o['table'] == 'trade': for datum in o['data']: exchange_timestamp = datum['timestamp'] # E.g. 2019-05-09T05:47:29.457Z python_exchange_datetime = dt.datetime.strptime(exchange_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') symbol = datum['symbol'] side = datum['side'] size = datum['size'] price = datum['price'] tickDirection = datum['tickDirection'] trdMatchID = datum['trdMatchID'] grossValue = datum['grossValue'] homeNotional = datum['homeNotional'] foreignNotional = datum['foreignNotional'] q.sendSync('.u.upd', np.string_('trade'), [ qlist([symbol], qtype=QSYMBOL_LIST), qlist([np.datetime64(dt.datetime.now(), 'ns')], qtype=QTIMESTAMP_LIST), qlist([np.datetime64(python_exchange_datetime, 'ns')], qtype=QTIMESTAMP_LIST), qlist([side], qtype=QSYMBOL_LIST), qlist([size], qtype=QDOUBLE_LIST), qlist([price], qtype=QDOUBLE_LIST), qlist([np.string_(tickDirection)], qtype=QSYMBOL_LIST), qlist([trdMatchID], qtype=QSTRING_LIST), qlist([grossValue], qtype=QDOUBLE_LIST), qlist([homeNotional], qtype=QDOUBLE_LIST), qlist([foreignNotional], qtype=QDOUBLE_LIST) ]) last_message_datetime = dt.datetime.now() message_count += 1 except Exception as e: logging.error(e) sys.exit(1) def on_error(ws, error): logging.error(error) sys.exit(1) def on_open(ws): logging.info('Opened WebSocket connection') def run(*args): global last_message_datetime global message_count while True: time.sleep(60) if last_message_datetime is None or (dt.datetime.now() - last_message_datetime).total_seconds() > 60.: print('!!! Have not seen any messages for one minute, exiting') ws.close() sys.exit() else: print(dt.datetime.now(), '...received %d messages...' % message_count) thread.start_new_thread(run, ()) def on_close(ws): print('Closed WebSocket connection') if __name__ == "__main__": parser = argparse.ArgumentParser(description='BitMEX feedhandler') parser.add_argument('--host', help='kdb+/q tickerplant host', action='store', dest='host', default='localhost') parser.add_argument('--port', help='kdb+/q tickerplant port', action='store', dest='port', type=int, default=5010) parser.add_argument('--console-log-level', help='console log level: 10=DEBUG, 20=INFO, 30=WARN, 40=ERROR, 50=CRITICAL', action='store', dest='console_log_level', type=int, default=20) parser.add_argument('--file-log-level', help='file log level: 10=DEBUG, 20=INFO, 30=WARN, 40=ERROR, 50=CRITICAL', action='store', dest='file_log_level', type=int, default=20) parser.add_argument('--log-file', help='path to log file', action='store', dest='log_file', default=None) args = parser.parse_args() if args.log_file is not None: logging.basicConfig( filename='bitmex-feedhandler.log', level=args.file_log_level, format='%(asctime)s %(pathname)s:%(lineno)d %(levelname)s %(message)s' ) console = logging.StreamHandler() console.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') console.setFormatter(formatter) logging.getLogger('').addHandler(console) else: logging.basicConfig( level=args.console_log_level, format='%(asctime)s %(levelname)s %(message)s' ) logging.info('Starting BitMEX Feedhandler') logging.debug('host: %s' % args.host) logging.debug('port: %s' % args.port) logging.debug('console-log-level: %s' % args.console_log_level) logging.debug('file-log-level: %s' % args.file_log_level) logging.debug('log-file: %s' % args.log_file) logging.info('Connecting to tickerplant on host=%s, port=%d' % (args.host, args.port)) from qpython import qconnection q = qconnection.QConnection(host=args.host, port=args.port) q.open() logging.info('Opening WebSocket connection') websocket.enableTrace(True) ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=orderBook10:XBTUSD,trade:XBTUSD", on_message = on_message, on_error = on_error, on_close = on_close) ws.on_open = on_open ws.run_forever()
We are now ready to launch the feedhandler:
python bitmex_feedhandler.py
Watch the tables on the rdb populate:
Visualization tool
Finally, we prototype a (very basic) visualization tool for our data on the basis of the Python tickerplant subscriber that we have introduced above. Here is the code, which can be improved in many ways:
import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import numpy as np import random import sys import threading import time from qpython import qconnection from qpython.qtype import QException from qpython.qconnection import MessageType from qpython.qcollection import QTable class ListenerThread(threading.Thread): def __init__(self, q, data_class): super(ListenerThread, self).__init__() self.q = q self._data_class = data_class self._stopper = threading.Event() def stopit(self): self._stopper.set() def stopped(self): return self._stopper.is_set() def run(self): while not self.stopped(): print('.') try: message = self.q.receive(data_only = False, raw = False) # retrieve entire message if message.type != MessageType.ASYNC: print('Unexpected message, expected message of type: ASYNC') print('type: %s, message type: %s, data size: %s, is_compressed: %s ' % (type(message), message.type, message.size, message.is_compressed)) if isinstance(message.data, list): # unpack upd message if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable): for row in message.data[2]: print(row) print(row[2], row[6]) self._data_class.x_data.append(row[2]) self._data_class.y_data.append(row[6]) except QException as e: print(e) class DataClass(): def __init__(self): self.x_data = [] self.y_data = [] class PlotClass(): def __init__(self, data_class): self._data_class = data_class self.line, = plt.plot([], [], 'o') self.ani = FuncAnimation(plt.gcf(), self.run, interval=1000, repeat=True) def run(self, i): print("plotting data") self.line.set_data(self._data_class.x_data, self._data_class.y_data) self.line.axes.relim() self.line.axes.autoscale_view() with qconnection.QConnection(host = 'localhost', port = 5010) as q: print(q) print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected())) print('Press <ENTER> to close application') # subscribe to tick print(dir(q)) response = q.sendSync('.u.sub', np.string_('trade'), np.string_('')) # get table model if isinstance(response[1], QTable): print('%s table data model: %s' % (response[0], response[1].dtype)) data = DataClass() plotter = PlotClass(data) t = ListenerThread(q, data) t.start() plt.show()
And here is the result of running it for a few minutes: