Skip to content

Instantly share code, notes, and snippets.

@arms22
Last active February 20, 2021 03:26
Show Gist options
  • Save arms22/501f4f8b9b100b0275a00e7002822ddc to your computer and use it in GitHub Desktop.
Save arms22/501f4f8b9b100b0275a00e7002822ddc to your computer and use it in GitHub Desktop.
import hmac
import json
import time
import zlib
from collections import defaultdict, deque
from itertools import zip_longest
from typing import DefaultDict, Deque, List, Dict, Tuple, Optional
from threading import Event
from websocket_manager import WebsocketManager
class FtxWebsocketClient(WebsocketManager):
_ENDPOINT = 'wss://ftx.com/ws/'
def __init__(self) -> None:
super().__init__()
self._trades: DefaultDict[str, Deque] = defaultdict(lambda: deque([], maxlen=10000))
self._fills: Deque = deque([], maxlen=10000)
self._api_key = '' # TODO: Place your API key here
self._api_secret = '' # TODO: Place your API secret here
self._subaccount = None
self._orderbook_update_events: DefaultDict[str, Event] = defaultdict(Event)
self._reset_data()
def _on_open(self, ws):
self._reset_data()
def _reset_data(self) -> None:
self._subscriptions: List[Dict] = []
self._orders: DefaultDict[int, Dict] = defaultdict(dict)
self._tickers: DefaultDict[str, Dict] = defaultdict(dict)
self._orderbook_timestamps: DefaultDict[str, float] = defaultdict(float)
self._orderbook_update_events.clear()
self._orderbooks: DefaultDict[str, Dict[str, DefaultDict[float, float]]] = defaultdict(
lambda: {side: defaultdict(float) for side in {'bids', 'asks'}})
self._orderbook_timestamps.clear()
self._logged_in = False
self._last_received_orderbook_data_at: float = 0.0
def _reset_orderbook(self, market: str) -> None:
if market in self._orderbooks:
del self._orderbooks[market]
if market in self._orderbook_timestamps:
del self._orderbook_timestamps[market]
def _get_url(self) -> str:
return self._ENDPOINT
def _login(self) -> None:
ts = int(time.time() * 1000)
req = {
'op': 'login', 'args': {
'key': self._api_key,
'sign': hmac.new(
self._api_secret.encode(), f'{ts}websocket_login'.encode(), 'sha256').hexdigest(),
'time': ts}}
if self._subaccount:
req['args']['subaccount'] = self._subaccount
self.send_json(req)
self._logged_in = True
def _subscribe(self, subscription: Dict) -> None:
self.send_json({'op': 'subscribe', **subscription})
self._subscriptions.append(subscription)
def _unsubscribe(self, subscription: Dict) -> None:
self.send_json({'op': 'unsubscribe', **subscription})
while subscription in self._subscriptions:
self._subscriptions.remove(subscription)
def get_fills(self) -> List[Dict]:
if not self._logged_in:
self._login()
subscription = {'channel': 'fills'}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return list(self._fills.copy())
def get_orders(self) -> Dict[int, Dict]:
if not self._logged_in:
self._login()
subscription = {'channel': 'orders'}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return dict(self._orders.copy())
def get_trades(self, market: str) -> List[Dict]:
subscription = {'channel': 'trades', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return list(self._trades[market].copy())
def get_orderbook(self, market: str) -> Dict[str, List[Tuple[float, float]]]:
subscription = {'channel': 'orderbook', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
if self._orderbook_timestamps[market] == 0:
self.wait_for_orderbook_update(market, 5)
return {
side: sorted(
[(price, quantity) for price, quantity in list(self._orderbooks[market][side].items())
if quantity],
key=lambda order: order[0] * (-1 if side == 'bids' else 1)
)
for side in {'bids', 'asks'}
}
def get_orderbook_timestamp(self, market: str) -> float:
return self._orderbook_timestamps[market]
def wait_for_orderbook_update(self, market: str, timeout: Optional[float]) -> None:
subscription = {'channel': 'orderbook', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
self._orderbook_update_events[market].wait(timeout)
def get_ticker(self, market: str) -> Dict:
subscription = {'channel': 'ticker', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return self._tickers[market]
def _handle_orderbook_message(self, message: Dict) -> None:
market = message['market']
subscription = {'channel': 'orderbook', 'market': market}
if subscription not in self._subscriptions:
return
data = message['data']
if data['action'] == 'partial':
self._reset_orderbook(market)
for side in {'bids', 'asks'}:
book = self._orderbooks[market][side]
for price, size in data[side]:
if size:
book[price] = size
else:
del book[price]
self._orderbook_timestamps[market] = data['time']
checksum = data['checksum']
orderbook = self.get_orderbook(market)
checksum_data = [
':'.join([f'{float(order[0])}:{float(order[1])}' for order in (bid, offer) if order])
for (bid, offer) in zip_longest(orderbook['bids'][:100], orderbook['asks'][:100])
]
computed_result = int(zlib.crc32(':'.join(checksum_data).encode()))
if computed_result != checksum:
self._last_received_orderbook_data_at = 0
self._reset_orderbook(market)
self._unsubscribe({'market': market, 'channel': 'orderbook'})
self._subscribe({'market': market, 'channel': 'orderbook'})
else:
self._orderbook_update_events[market].set()
self._orderbook_update_events[market].clear()
def _handle_trades_message(self, message: Dict) -> None:
self._trades[message['market']].append(message['data'])
def _handle_ticker_message(self, message: Dict) -> None:
self._tickers[message['market']] = message['data']
def _handle_fills_message(self, message: Dict) -> None:
self._fills.append(message['data'])
def _handle_orders_message(self, message: Dict) -> None:
data = message['data']
self._orders.update({data['id']: data})
def _on_message(self, ws, raw_message: str) -> None:
message = json.loads(raw_message)
message_type = message['type']
if message_type in {'subscribed', 'unsubscribed'}:
return
elif message_type == 'info':
if message['code'] == 20001:
return self.reconnect()
elif message_type == 'error':
raise Exception(message)
channel = message['channel']
if channel == 'orderbook':
self._handle_orderbook_message(message)
elif channel == 'trades':
self._handle_trades_message(message)
elif channel == 'ticker':
self._handle_ticker_message(message)
elif channel == 'fills':
self._handle_fills_message(message)
elif channel == 'orders':
self._handle_orders_message(message)
import json
import time
from threading import Thread, Lock
from websocket import WebSocketApp
class WebsocketManager:
_CONNECT_TIMEOUT_S = 5
def __init__(self):
self.connect_lock = Lock()
self.ws = None
def _get_url(self):
raise NotImplementedError()
def _on_message(self, ws, message):
raise NotImplementedError()
def send(self, message):
self.connect()
self.ws.send(message)
def send_json(self, message):
self.send(json.dumps(message))
def _connect(self):
assert not self.ws, "ws should be closed before attempting to connect"
self.ws = WebSocketApp(
self._get_url(),
on_message=self._wrap_callback(self._on_message),
on_close=self._wrap_callback(self._on_close),
on_error=self._wrap_callback(self._on_error),
)
wst = Thread(target=self._run_websocket, args=(self.ws,))
wst.daemon = True
wst.start()
# Wait for socket to connect
ts = time.time()
while self.ws and (not self.ws.sock or not self.ws.sock.connected):
if time.time() - ts > self._CONNECT_TIMEOUT_S:
self.ws = None
return
time.sleep(0.1)
def _wrap_callback(self, f):
def wrapped_f(ws, *args, **kwargs):
if ws is self.ws:
try:
f(ws, *args, **kwargs)
except Exception as e:
raise Exception(f'Error running websocket callback: {e}')
return wrapped_f
def _run_websocket(self, ws):
try:
ws.run_forever()
except Exception as e:
raise Exception(f'Unexpected error while running websocket: {e}')
finally:
self._reconnect(ws)
def _reconnect(self, ws):
assert ws is not None, '_reconnect should only be called with an existing ws'
if ws is self.ws:
self.ws = None
ws.close()
self.connect()
def connect(self):
if self.ws:
return
with self.connect_lock:
while not self.ws:
self._connect()
if self.ws:
return
def _on_close(self, ws):
self._reconnect(ws)
def _on_error(self, ws, error):
self._reconnect(ws)
def reconnect(self) -> None:
if self.ws is not None:
self._reconnect(self.ws)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment