Skip to content

Instantly share code, notes, and snippets.

@franzwong
Created April 21, 2024 02:44
Show Gist options
  • Save franzwong/fd47388d0262334be91a95f57f11f65d to your computer and use it in GitHub Desktop.
Save franzwong/fd47388d0262334be91a95f57f11f65d to your computer and use it in GitHub Desktop.
Mock kdb+/q ticker plant
import socket
import struct
import sys
import threading
import numpy as np
from qpython import qconnection, MetaData, CONVERSION_OPTIONS
from qpython._pandas import PandasQReader, PandasQWriter
from qpython.qcollection import QTable, qlist, qtable
from qpython.qconnection import MessageType
from qpython.qreader import QReader, QMessage
from qpython.qtype import QException, QSYMBOL_LIST, QFLOAT_LIST
from qpython.qwriter import QWriter
class ClientConnection:
def __init__(self, encoding: str, connection: socket.socket, address):
self._encoding = encoding
self._conn = connection
self._addr = address
self._reader: QReader | None = None
self._writer: QWriter | None = None
def initialize(self, ipc_version):
conn_file = self._conn.makefile('rb')
self._reader = PandasQReader(conn_file, encoding=self._encoding)
self._writer = PandasQWriter(self._conn, protocol_version=ipc_version, encoding=self._encoding)
def close(self):
self._conn.close()
def raw_recv(self, buffer_size, flags=0):
return self._conn.recv(buffer_size, flags)
def raw_send(self, data, flags=0):
self._conn.send(data, flags)
def write(self, data, msg_type, **options):
self._writer.write(data, msg_type, **options)
def read(self, source=None, **options) -> QMessage:
return self._reader.read(source, **options)
class MockServer:
def __init__(self, host: str, port: int, stopper: threading.Event, encoding: str = "latin-1", tick_interval_seconds: int = 1):
self._host = host
self._port = port
self._stopper = stopper
self._encoding = encoding
self._tick_interval_seconds = tick_interval_seconds
self._options = MetaData(**CONVERSION_OPTIONS.union_dict())
def set_encoding(self, encoding: str):
self._encoding = encoding
def set_tick_interval_seconds(self, interval: int):
self._tick_interval_seconds = interval
def get_table_name(self):
return b"trade"
def get_mock_tick(self):
columns = qlist(['sym', 'ask'], qtype=QSYMBOL_LIST)
return qtable(columns, [qlist(["USDJPY"], qtype=QSYMBOL_LIST), qlist([154.9], qtype=QFLOAT_LIST)])
def stop(self):
self._stopper.set()
def run(self):
with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s):
s.bind((self._host, self._port))
s.listen()
conn, addr = s.accept()
client_conn = ClientConnection(self._encoding, conn, addr)
try:
print("Server: Connected to a client")
response = client_conn.raw_recv(99)
ipc_version = response[len(response) - 2] if len(response) > 1 else 0
client_conn.raw_send(struct.pack("B", ipc_version))
client_conn.initialize(ipc_version)
while not self._stopper.is_set():
response = client_conn.read(**self._options.union_dict())
if response.data[0].decode(self._encoding) == ".u.sub":
print(f"Client subscribed to table {response.data[1].decode(self._encoding)}")
table = self.get_mock_tick()
client_conn.write([self.get_table_name(), table], MessageType.RESPONSE, **self._options.union_dict())
break
while not self._stopper.is_set():
table = self.get_mock_tick()
client_conn.write([b"upd"] + list([self.get_table_name(), table]), MessageType.ASYNC, **self._options.union_dict())
self._stopper.wait(self._tick_interval_seconds)
finally:
client_conn.close()
class Listener:
def __init__(self, stopper: threading.Event, q: qconnection.QConnection):
self._stopper = stopper
self._q = q
def run(self):
while not self._stopper.is_set():
try:
message = self._q.receive(data_only=False, raw=False)
if message.type != MessageType.ASYNC:
print('Unexpected message, expected message of type: ASYNC')
if isinstance(message.data, list):
if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable):
for row in message.data[2]:
print(row)
except QException as e:
print(e)
def get_free_port() -> int:
with socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) as s:
s.bind(('localhost', 0))
address, port = s.getsockname()
return port
def main():
host = 'localhost'
port = get_free_port()
user_name = "kdbuser"
password = "kdbpassword"
stopper = threading.Event()
mock_server = MockServer(host, port, stopper)
server_thread = threading.Thread(target=mock_server.run)
server_thread.start()
try:
with (qconnection.QConnection(host=host, port=port, username=user_name, password=password) as q):
response = q.sendSync(".u.sub", np.string_('trade'), np.string_(''))
if isinstance(response[1], QTable):
print('%s table data model: %s' % (response[0], response[1].dtype))
listener = Listener(stopper, q)
listener_thread = threading.Thread(target=listener.run)
listener_thread.start()
sys.stdin.readline()
finally:
stopper.set()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment