Skip to content

Instantly share code, notes, and snippets.

@tori-takashi
Created January 18, 2020 09:41
Show Gist options
  • Save tori-takashi/b4e9e9732673474c7d4ed820fa3add0d to your computer and use it in GitHub Desktop.
Save tori-takashi/b4e9e9732673474c7d4ed820fa3add0d to your computer and use it in GitHub Desktop.
#updated at 2020/1/18
#!/usr/bin/python3
import websocket
import threading
import traceback
from time import sleep
import json
import logging
import urllib
import math
import time
import time
import urllib
import hmac
import hashlib
# Naive implementation of connecting to BitMEX websocket for streaming realtime data.
# The Marketmaker still interacts with this as if it were a REST Endpoint, but now it can get
# much more realtime data without polling the hell out of the API.
#
# The Websocket offers a bunch of data as raw properties right on the object.
# On connect, it synchronously asks for a push of all this data then returns.
# Right after, the MM can start using its data. It will be updated in realtime, so the MM can
# poll really often if it wants.
class BitMEXWebsocket:
# Don't grow a table larger than this amount. Helps cap memory usage.
MAX_TABLE_LEN = 200
def __init__(self, endpoint, symbol, api_key=None, api_secret=None):
'''Connect to the websocket and initialize data stores.'''
self.logger = logging.getLogger(__name__)
self.logger.debug("Initializing WebSocket.")
self.endpoint = endpoint
self.symbol = symbol
if api_key is not None and api_secret is None:
raise ValueError('api_secret is required if api_key is provided')
if api_key is None and api_secret is not None:
raise ValueError('api_key is required if api_secret is provided')
self.api_key = api_key
self.api_secret = api_secret
self.data = {}
self.keys = {}
self.exited = False
# findItemByKeys高速化のため、インデックスを作成・格納するための変数を作っておく
self.itemIdxs = {}
# 高速化のため、各処理の処理時間を格納するtimearkを作成
"""
self.timemark = {}
self.timemark['partial'] = 0
self.timemark['insert'] = 0
self.timemark['update'] = 0
self.timemark['delete'] = 0
self.timemark['find'] = 0
"""
# We can subscribe right in the connection querystring, so let's build that.
# Subscribe to all pertinent endpoints
wsURL = self.__get_url()
print("Connecting to %s" % wsURL)
self.__connect(wsURL, symbol)
print('Connected to WS.')
# Connected. Wait for partials
self.__wait_for_symbol(symbol)
if api_key:
self.__wait_for_account()
self.logger.info('Got all market data. Starting.')
def generate_nonce(self):
return int(round(time.time() + 3000))
# Generates an API signature.
# A signature is HMAC_SHA256(secret, verb + path + nonce + data), hex encoded.
# Verb must be uppercased, url is relative, nonce must be an increasing 64-bit integer
# and the data, if present, must be JSON without whitespace between keys.
#
# For example, in psuedocode (and in real code below):
#
# verb=POST
# url=/api/v1/order
# nonce=1416993995705
# data={"symbol":"XBTZ14","quantity":1,"price":395.01}
# signature = HEX(HMAC_SHA256(secret, 'POST/api/v1/order1416993995705{"symbol":"XBTZ14","quantity":1,"price":395.01}'))
def generate_signature(self, secret, verb, url, nonce, data):
"""Generate a request signature compatible with BitMEX."""
# Parse the url so we can remove the base and extract just the path.
parsedURL = urllib.parse.urlparse(url)
path = parsedURL.path
if parsedURL.query:
path = path + '?' + parsedURL.query
# print "Computing HMAC: %s" % verb + path + str(nonce) + data
message = (verb + path + str(nonce) + data).encode('utf-8')
signature = hmac.new(secret.encode('utf-8'), message,
digestmod=hashlib.sha256).hexdigest()
return signature
def exit(self):
'''Call this to exit - will close websocket.'''
self.exited = True
self.ws.close()
def get_instrument(self):
'''Get the raw instrument data for this symbol.'''
# Turn the 'tickSize' into 'tickLog' for use in rounding
instrument = self.data['instrument'][0]
instrument['tickLog'] = int(
math.fabs(math.log10(instrument['tickSize'])))
return instrument
def get_ticker(self):
'''Return a ticker object. Generated from quote and trade.'''
lastQuote = self.data['quote'][-1]
lastTrade = self.data['trade'][-1]
ticker = {
"last": lastTrade['price'],
"buy": lastQuote['bidPrice'],
"sell": lastQuote['askPrice'],
"mid": (float(lastQuote['bidPrice'] or 0) + float(lastQuote['askPrice'] or 0)) / 2
}
# The instrument has a tickSize. Use it to round values.
instrument = self.data['instrument'][0]
return {k: round(float(v or 0), instrument['tickLog']) for k, v in ticker.items()}
def funds(self):
'''Get your margin details.'''
return self.data['margin'][0]
def market_depth(self):
'''Get market depth (orderbook). Returns all levels.'''
return self.data['orderBookL2']
def open_orders(self, clOrdIDPrefix):
'''Get all your open orders.'''
orders = self.data['order']
# Filter to only open orders and those that we atually placed
return [o for o in orders if str(o['clOrdID']).startswith(clOrdIDPrefix) and self.order_leaves_quantity(o)]
def recent_trades(self):
'''Get recent trades.'''
return self.data['trade']
#
# End Public Methods
#
def __connect(self, wsURL, symbol):
'''Connect to the websocket in a thread.'''
self.logger.debug("Starting thread")
self.ws = websocket.WebSocketApp(wsURL,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error,
header=self.__get_auth())
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
self.logger.debug("Started thread")
# Wait for connect before continuing
conn_timeout = 5
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout:
sleep(1)
conn_timeout -= 1
if not conn_timeout:
self.logger.error("Couldn't connect to WS! Exiting.")
self.exit()
raise websocket.WebSocketTimeoutException(
'Couldn\'t connect to WS! Exiting.')
def __get_auth(self):
'''Return auth headers. Will use API time.time() if present in settings.'''
if self.api_key:
self.logger.info("Authenticating with API Key.")
# To auth to the WS using an API key, we generate a signature of a nonce and
# the WS API endpoint.
expires = self.generate_nonce()
return [
"api-expires: " + str(expires),
"api-signature: " +
self.generate_signature(
self.api_secret, 'GET', '/realtime', expires, ''),
"api-key:" + self.api_key
]
else:
self.logger.info("Not authenticating.")
return []
def __get_url(self):
'''
Generate a connection URL. We can define subscriptions right in the querystring.
Most subscription topics are scoped by the symbol we're listening to.
'''
# You can sub to orderBookL2 for all levels, or orderBook10 for top 10 levels & save bandwidth
symbolSubs = ["execution", "instrument", "order",
"orderBookL2", "position", "quote", "trade"]
genericSubs = ["margin"]
subscriptions = [sub + ':' + self.symbol for sub in symbolSubs]
subscriptions += genericSubs
urlParts = list(urllib.parse.urlparse(self.endpoint))
urlParts[0] = urlParts[0].replace('http', 'ws')
urlParts[2] = "/realtime?subscribe={}".format(','.join(subscriptions))
return urllib.parse.urlunparse(urlParts)
def __wait_for_account(self):
'''On subscribe, this data will come down. Wait for it.'''
# Wait for the time.time() to show up from the ws
while not {'margin', 'position', 'order', 'orderBookL2'} <= set(self.data):
sleep(0.1)
def __wait_for_symbol(self, symbol):
'''On subscribe, this data will come down. Wait for it.'''
while not {'instrument', 'trade', 'quote'} <= set(self.data):
sleep(0.1)
def __send_command(self, command, args=None):
'''Send a raw command.'''
if args is None:
args = []
self.ws.send(json.dumps({"op": command, "args": args}))
def __on_message(self, message):
'''Handler for parsing WS messages.'''
message = json.loads(message)
self.logger.debug(json.dumps(message))
table = message['table'] if 'table' in message else None
action = message['action'] if 'action' in message else None
try:
if 'subscribe' in message:
self.logger.debug("Subscribed to %s." % message['subscribe'])
elif action:
if table not in self.data:
self.data[table] = []
# index格納用objにtable用のobjを追加
if table not in self.itemIdxs:
self.itemIdxs[table] = {}
# keysが含まれない情報があるので、追加
if table not in self.keys:
self.keys[table] = {}
# There are four possible actions from the WS:
# 'partial' - full table image
# 'insert' - new row
# 'update' - update row
# 'delete' - delete row
if action == 'partial':
# 処理時間計測開始
#start = time.time()
self.logger.debug("%s: partial" % table)
self.data[table] = message['data']
# time.time() are communicated on partials to let you know how to uniquely identify
# an item. We use it for updates.
self.keys[table] = message['keys']
# indexを作成します
# self.itemIdxs[table][keyvalue(kye1val-key2val-key3val)] に
# 対象データのdata[table]上のインデックスが格納されます
for i in range(len(self.data[table])):
item = self.data[table][i]
keyvalues = "-".join([str(v)
for k, v in item.items() if k in self.keys[table]])
self.itemIdxs[table][keyvalues] = i
# 処理時間計測終了・登録
#end = time.time()
#self.timemark['partial'] += (end - start)
elif action == 'insert':
# 処理時間計測開始
#start = time.time()
self.logger.debug('%s: inserting %s' %
(table, message['data']))
self.data[table] += message['data']
# 最後尾アイテムのindexを追加します
item = self.data[table][-1]
keyvalues = "-".join([str(v)
for k, v in item.items() if k in self.keys[table]])
self.itemIdxs[table][keyvalues] = len(self.data[table])-1
# Limit the max length of the table to avoid excessive memory usage.
# Don't trim orders because we'll lose valuable state if we do.
if table not in ['order', 'orderBookL2'] and len(self.data[table]) > BitMEXWebsocket.MAX_TABLE_LEN:
self.data[table] = self.data[table][int(
BitMEXWebsocket.MAX_TABLE_LEN / 2):]
# インデックスの再構築をします
for i in range(len(self.data[table])):
item = self.data[table][i]
keyvalues = "-".join([str(v)
for k, v in item.items() if k in self.keys[table]])
self.itemIdxs[table][keyvalues] = i
# 処理時間計測終了・登録
#end = time.time()
#self.timemark['insert'] += (end - start)
elif action == 'update':
# 処理時間計測開始
#start = time.time()
self.logger.debug('%s: updating %s' %
(table, message['data']))
# Locate the item in the collection and update it.
for updateData in message['data']:
# 高速化のため、itemIdxsを追加で引数指定
item = self.findItemByKeys(
self.keys[table], self.data[table], updateData, self.itemIdxs[table])
if not item:
return # No item found to update. Could happen before push
item.update(updateData)
# Remove cancelled / filled orders
if table == 'order' and not self.order_leaves_quantity(item):
self.data[table].remove(item)
# 処理時間計測終了・登録
#end = time.time()
#self.timemark['update'] += (end - start)
elif action == 'delete':
# 処理時間計測開始
#start = time.time()
self.logger.debug('%s: deleting %s' %
(table, message['data']))
# Locate the item in the collection and remove it.
for deleteData in message['data']:
# 高速化のため、itemIdxsを追加で引数指定
item = self.findItemByKeys(
self.keys[table], self.data[table], deleteData, self.itemIdxs[table])
self.data[table].remove(item)
# インデックスの再構築をします
for i in range(len(self.data[table])):
item = self.data[table][i]
keyvalues = "-".join([str(v)
for k, v in item.items() if k in self.keys[table]])
self.itemIdxs[table][keyvalues] = i
# 処理時間計測終了・登録
#end = time.time()
#self.timemark['delete'] += (end - start)
else:
raise Exception("Unknown action: %s" % action)
except:
self.logger.error(traceback.format_exc())
def __on_error(self, error):
'''Called on fatal websocket errors. We exit on these.'''
if not self.exited:
self.logger.error("Error : %s" % error)
raise websocket.WebSocketException(error)
def __on_open(self):
'''Called when the WS opens.'''
self.logger.debug("Websocket Opened.")
def __on_close(self):
'''Called on websocket close.'''
self.logger.info('Websocket Closed')
# 処理時間計測処理を加えるため、クラスメソッドに変更
# Utility method for finding an item in the store.
# When an update comes through on the websocket, we need to figure out which item in the array it is
# in order to match that item.
#
# Helpfully, on a data push (or on an HTTP hit to /api/v1/schema), we have a "keys" array. These are the
# fields we can use to uniquely identify an item. Sometimes there is more than one, so we iterate through all
# provided keys.
def findItemByKeys(self, keys, table, matchData, itemIdxs):
# 処理時間計測開始
#start = time.time()
md_keyvalue = "-".join([str(v)
for k, v in matchData.items() if k in keys])
if md_keyvalue in itemIdxs.keys() and len(table) > itemIdxs[md_keyvalue]:
# 処理時間計測終了・登録
#end = time.time()
#self.timemark['find'] += (end - start)
return table[itemIdxs[md_keyvalue]]
#end = time.time()
#self.timemark['find'] += (end - start)
def order_leaves_quantity(self, o):
if o['leavesQty'] is None:
return True
return o['leavesQty'] > 0
""" 旧ロジック
for item in table:
matched = True
for key in keys:
if item[key] != matchData[key]:
matched = False
if matched:
end = time.time()
self.timemark['find'] += (end - start)
return item
commented out below due to useless codes for us
if __name__ == '__main__':
conf = config_main.Config()
ep = "https://www.bitmex.com/api/v1"
ex = BitMEXWebsocket(symbol='XBTUSD', endpoint=ep, api_key=conf.API_KEY, api_secret=conf.SECRET )
ex.get_instrument()
if(ex.ws.sock.connect):
ticker = ex.get_ticker()
print(ticker)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment