Created
January 18, 2020 09:41
-
-
Save tori-takashi/b4e9e9732673474c7d4ed820fa3add0d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#updated at 2020/1/18 | |
#!/usr/bin/python3 | |
import websocket | |
import threading | |
import traceback | |
from time import sleep | |
import json | |
import logging | |
import urllib | |
import math | |
import time | |
import time | |
import urllib | |
import hmac | |
import hashlib | |
# Naive implementation of connecting to BitMEX websocket for streaming realtime data. | |
# The Marketmaker still interacts with this as if it were a REST Endpoint, but now it can get | |
# much more realtime data without polling the hell out of the API. | |
# | |
# The Websocket offers a bunch of data as raw properties right on the object. | |
# On connect, it synchronously asks for a push of all this data then returns. | |
# Right after, the MM can start using its data. It will be updated in realtime, so the MM can | |
# poll really often if it wants. | |
class BitMEXWebsocket: | |
# Don't grow a table larger than this amount. Helps cap memory usage. | |
MAX_TABLE_LEN = 200 | |
def __init__(self, endpoint, symbol, api_key=None, api_secret=None): | |
'''Connect to the websocket and initialize data stores.''' | |
self.logger = logging.getLogger(__name__) | |
self.logger.debug("Initializing WebSocket.") | |
self.endpoint = endpoint | |
self.symbol = symbol | |
if api_key is not None and api_secret is None: | |
raise ValueError('api_secret is required if api_key is provided') | |
if api_key is None and api_secret is not None: | |
raise ValueError('api_key is required if api_secret is provided') | |
self.api_key = api_key | |
self.api_secret = api_secret | |
self.data = {} | |
self.keys = {} | |
self.exited = False | |
# findItemByKeys高速化のため、インデックスを作成・格納するための変数を作っておく | |
self.itemIdxs = {} | |
# 高速化のため、各処理の処理時間を格納するtimearkを作成 | |
""" | |
self.timemark = {} | |
self.timemark['partial'] = 0 | |
self.timemark['insert'] = 0 | |
self.timemark['update'] = 0 | |
self.timemark['delete'] = 0 | |
self.timemark['find'] = 0 | |
""" | |
# We can subscribe right in the connection querystring, so let's build that. | |
# Subscribe to all pertinent endpoints | |
wsURL = self.__get_url() | |
print("Connecting to %s" % wsURL) | |
self.__connect(wsURL, symbol) | |
print('Connected to WS.') | |
# Connected. Wait for partials | |
self.__wait_for_symbol(symbol) | |
if api_key: | |
self.__wait_for_account() | |
self.logger.info('Got all market data. Starting.') | |
def generate_nonce(self): | |
return int(round(time.time() + 3000)) | |
# Generates an API signature. | |
# A signature is HMAC_SHA256(secret, verb + path + nonce + data), hex encoded. | |
# Verb must be uppercased, url is relative, nonce must be an increasing 64-bit integer | |
# and the data, if present, must be JSON without whitespace between keys. | |
# | |
# For example, in psuedocode (and in real code below): | |
# | |
# verb=POST | |
# url=/api/v1/order | |
# nonce=1416993995705 | |
# data={"symbol":"XBTZ14","quantity":1,"price":395.01} | |
# signature = HEX(HMAC_SHA256(secret, 'POST/api/v1/order1416993995705{"symbol":"XBTZ14","quantity":1,"price":395.01}')) | |
def generate_signature(self, secret, verb, url, nonce, data): | |
"""Generate a request signature compatible with BitMEX.""" | |
# Parse the url so we can remove the base and extract just the path. | |
parsedURL = urllib.parse.urlparse(url) | |
path = parsedURL.path | |
if parsedURL.query: | |
path = path + '?' + parsedURL.query | |
# print "Computing HMAC: %s" % verb + path + str(nonce) + data | |
message = (verb + path + str(nonce) + data).encode('utf-8') | |
signature = hmac.new(secret.encode('utf-8'), message, | |
digestmod=hashlib.sha256).hexdigest() | |
return signature | |
def exit(self): | |
'''Call this to exit - will close websocket.''' | |
self.exited = True | |
self.ws.close() | |
def get_instrument(self): | |
'''Get the raw instrument data for this symbol.''' | |
# Turn the 'tickSize' into 'tickLog' for use in rounding | |
instrument = self.data['instrument'][0] | |
instrument['tickLog'] = int( | |
math.fabs(math.log10(instrument['tickSize']))) | |
return instrument | |
def get_ticker(self): | |
'''Return a ticker object. Generated from quote and trade.''' | |
lastQuote = self.data['quote'][-1] | |
lastTrade = self.data['trade'][-1] | |
ticker = { | |
"last": lastTrade['price'], | |
"buy": lastQuote['bidPrice'], | |
"sell": lastQuote['askPrice'], | |
"mid": (float(lastQuote['bidPrice'] or 0) + float(lastQuote['askPrice'] or 0)) / 2 | |
} | |
# The instrument has a tickSize. Use it to round values. | |
instrument = self.data['instrument'][0] | |
return {k: round(float(v or 0), instrument['tickLog']) for k, v in ticker.items()} | |
def funds(self): | |
'''Get your margin details.''' | |
return self.data['margin'][0] | |
def market_depth(self): | |
'''Get market depth (orderbook). Returns all levels.''' | |
return self.data['orderBookL2'] | |
def open_orders(self, clOrdIDPrefix): | |
'''Get all your open orders.''' | |
orders = self.data['order'] | |
# Filter to only open orders and those that we atually placed | |
return [o for o in orders if str(o['clOrdID']).startswith(clOrdIDPrefix) and self.order_leaves_quantity(o)] | |
def recent_trades(self): | |
'''Get recent trades.''' | |
return self.data['trade'] | |
# | |
# End Public Methods | |
# | |
def __connect(self, wsURL, symbol): | |
'''Connect to the websocket in a thread.''' | |
self.logger.debug("Starting thread") | |
self.ws = websocket.WebSocketApp(wsURL, | |
on_message=self.__on_message, | |
on_close=self.__on_close, | |
on_open=self.__on_open, | |
on_error=self.__on_error, | |
header=self.__get_auth()) | |
self.wst = threading.Thread(target=lambda: self.ws.run_forever()) | |
self.wst.daemon = True | |
self.wst.start() | |
self.logger.debug("Started thread") | |
# Wait for connect before continuing | |
conn_timeout = 5 | |
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout: | |
sleep(1) | |
conn_timeout -= 1 | |
if not conn_timeout: | |
self.logger.error("Couldn't connect to WS! Exiting.") | |
self.exit() | |
raise websocket.WebSocketTimeoutException( | |
'Couldn\'t connect to WS! Exiting.') | |
def __get_auth(self): | |
'''Return auth headers. Will use API time.time() if present in settings.''' | |
if self.api_key: | |
self.logger.info("Authenticating with API Key.") | |
# To auth to the WS using an API key, we generate a signature of a nonce and | |
# the WS API endpoint. | |
expires = self.generate_nonce() | |
return [ | |
"api-expires: " + str(expires), | |
"api-signature: " + | |
self.generate_signature( | |
self.api_secret, 'GET', '/realtime', expires, ''), | |
"api-key:" + self.api_key | |
] | |
else: | |
self.logger.info("Not authenticating.") | |
return [] | |
def __get_url(self): | |
''' | |
Generate a connection URL. We can define subscriptions right in the querystring. | |
Most subscription topics are scoped by the symbol we're listening to. | |
''' | |
# You can sub to orderBookL2 for all levels, or orderBook10 for top 10 levels & save bandwidth | |
symbolSubs = ["execution", "instrument", "order", | |
"orderBookL2", "position", "quote", "trade"] | |
genericSubs = ["margin"] | |
subscriptions = [sub + ':' + self.symbol for sub in symbolSubs] | |
subscriptions += genericSubs | |
urlParts = list(urllib.parse.urlparse(self.endpoint)) | |
urlParts[0] = urlParts[0].replace('http', 'ws') | |
urlParts[2] = "/realtime?subscribe={}".format(','.join(subscriptions)) | |
return urllib.parse.urlunparse(urlParts) | |
def __wait_for_account(self): | |
'''On subscribe, this data will come down. Wait for it.''' | |
# Wait for the time.time() to show up from the ws | |
while not {'margin', 'position', 'order', 'orderBookL2'} <= set(self.data): | |
sleep(0.1) | |
def __wait_for_symbol(self, symbol): | |
'''On subscribe, this data will come down. Wait for it.''' | |
while not {'instrument', 'trade', 'quote'} <= set(self.data): | |
sleep(0.1) | |
def __send_command(self, command, args=None): | |
'''Send a raw command.''' | |
if args is None: | |
args = [] | |
self.ws.send(json.dumps({"op": command, "args": args})) | |
def __on_message(self, message): | |
'''Handler for parsing WS messages.''' | |
message = json.loads(message) | |
self.logger.debug(json.dumps(message)) | |
table = message['table'] if 'table' in message else None | |
action = message['action'] if 'action' in message else None | |
try: | |
if 'subscribe' in message: | |
self.logger.debug("Subscribed to %s." % message['subscribe']) | |
elif action: | |
if table not in self.data: | |
self.data[table] = [] | |
# index格納用objにtable用のobjを追加 | |
if table not in self.itemIdxs: | |
self.itemIdxs[table] = {} | |
# keysが含まれない情報があるので、追加 | |
if table not in self.keys: | |
self.keys[table] = {} | |
# There are four possible actions from the WS: | |
# 'partial' - full table image | |
# 'insert' - new row | |
# 'update' - update row | |
# 'delete' - delete row | |
if action == 'partial': | |
# 処理時間計測開始 | |
#start = time.time() | |
self.logger.debug("%s: partial" % table) | |
self.data[table] = message['data'] | |
# time.time() are communicated on partials to let you know how to uniquely identify | |
# an item. We use it for updates. | |
self.keys[table] = message['keys'] | |
# indexを作成します | |
# self.itemIdxs[table][keyvalue(kye1val-key2val-key3val)] に | |
# 対象データのdata[table]上のインデックスが格納されます | |
for i in range(len(self.data[table])): | |
item = self.data[table][i] | |
keyvalues = "-".join([str(v) | |
for k, v in item.items() if k in self.keys[table]]) | |
self.itemIdxs[table][keyvalues] = i | |
# 処理時間計測終了・登録 | |
#end = time.time() | |
#self.timemark['partial'] += (end - start) | |
elif action == 'insert': | |
# 処理時間計測開始 | |
#start = time.time() | |
self.logger.debug('%s: inserting %s' % | |
(table, message['data'])) | |
self.data[table] += message['data'] | |
# 最後尾アイテムのindexを追加します | |
item = self.data[table][-1] | |
keyvalues = "-".join([str(v) | |
for k, v in item.items() if k in self.keys[table]]) | |
self.itemIdxs[table][keyvalues] = len(self.data[table])-1 | |
# Limit the max length of the table to avoid excessive memory usage. | |
# Don't trim orders because we'll lose valuable state if we do. | |
if table not in ['order', 'orderBookL2'] and len(self.data[table]) > BitMEXWebsocket.MAX_TABLE_LEN: | |
self.data[table] = self.data[table][int( | |
BitMEXWebsocket.MAX_TABLE_LEN / 2):] | |
# インデックスの再構築をします | |
for i in range(len(self.data[table])): | |
item = self.data[table][i] | |
keyvalues = "-".join([str(v) | |
for k, v in item.items() if k in self.keys[table]]) | |
self.itemIdxs[table][keyvalues] = i | |
# 処理時間計測終了・登録 | |
#end = time.time() | |
#self.timemark['insert'] += (end - start) | |
elif action == 'update': | |
# 処理時間計測開始 | |
#start = time.time() | |
self.logger.debug('%s: updating %s' % | |
(table, message['data'])) | |
# Locate the item in the collection and update it. | |
for updateData in message['data']: | |
# 高速化のため、itemIdxsを追加で引数指定 | |
item = self.findItemByKeys( | |
self.keys[table], self.data[table], updateData, self.itemIdxs[table]) | |
if not item: | |
return # No item found to update. Could happen before push | |
item.update(updateData) | |
# Remove cancelled / filled orders | |
if table == 'order' and not self.order_leaves_quantity(item): | |
self.data[table].remove(item) | |
# 処理時間計測終了・登録 | |
#end = time.time() | |
#self.timemark['update'] += (end - start) | |
elif action == 'delete': | |
# 処理時間計測開始 | |
#start = time.time() | |
self.logger.debug('%s: deleting %s' % | |
(table, message['data'])) | |
# Locate the item in the collection and remove it. | |
for deleteData in message['data']: | |
# 高速化のため、itemIdxsを追加で引数指定 | |
item = self.findItemByKeys( | |
self.keys[table], self.data[table], deleteData, self.itemIdxs[table]) | |
self.data[table].remove(item) | |
# インデックスの再構築をします | |
for i in range(len(self.data[table])): | |
item = self.data[table][i] | |
keyvalues = "-".join([str(v) | |
for k, v in item.items() if k in self.keys[table]]) | |
self.itemIdxs[table][keyvalues] = i | |
# 処理時間計測終了・登録 | |
#end = time.time() | |
#self.timemark['delete'] += (end - start) | |
else: | |
raise Exception("Unknown action: %s" % action) | |
except: | |
self.logger.error(traceback.format_exc()) | |
def __on_error(self, error): | |
'''Called on fatal websocket errors. We exit on these.''' | |
if not self.exited: | |
self.logger.error("Error : %s" % error) | |
raise websocket.WebSocketException(error) | |
def __on_open(self): | |
'''Called when the WS opens.''' | |
self.logger.debug("Websocket Opened.") | |
def __on_close(self): | |
'''Called on websocket close.''' | |
self.logger.info('Websocket Closed') | |
# 処理時間計測処理を加えるため、クラスメソッドに変更 | |
# Utility method for finding an item in the store. | |
# When an update comes through on the websocket, we need to figure out which item in the array it is | |
# in order to match that item. | |
# | |
# Helpfully, on a data push (or on an HTTP hit to /api/v1/schema), we have a "keys" array. These are the | |
# fields we can use to uniquely identify an item. Sometimes there is more than one, so we iterate through all | |
# provided keys. | |
def findItemByKeys(self, keys, table, matchData, itemIdxs): | |
# 処理時間計測開始 | |
#start = time.time() | |
md_keyvalue = "-".join([str(v) | |
for k, v in matchData.items() if k in keys]) | |
if md_keyvalue in itemIdxs.keys() and len(table) > itemIdxs[md_keyvalue]: | |
# 処理時間計測終了・登録 | |
#end = time.time() | |
#self.timemark['find'] += (end - start) | |
return table[itemIdxs[md_keyvalue]] | |
#end = time.time() | |
#self.timemark['find'] += (end - start) | |
def order_leaves_quantity(self, o): | |
if o['leavesQty'] is None: | |
return True | |
return o['leavesQty'] > 0 | |
""" 旧ロジック | |
for item in table: | |
matched = True | |
for key in keys: | |
if item[key] != matchData[key]: | |
matched = False | |
if matched: | |
end = time.time() | |
self.timemark['find'] += (end - start) | |
return item | |
commented out below due to useless codes for us | |
if __name__ == '__main__': | |
conf = config_main.Config() | |
ep = "https://www.bitmex.com/api/v1" | |
ex = BitMEXWebsocket(symbol='XBTUSD', endpoint=ep, api_key=conf.API_KEY, api_secret=conf.SECRET ) | |
ex.get_instrument() | |
if(ex.ws.sock.connect): | |
ticker = ex.get_ticker() | |
print(ticker) | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment