Skip to content

Instantly share code, notes, and snippets.

@normanlmfung
Last active October 16, 2024 05:02
Show Gist options
  • Save normanlmfung/5000e4574ce1f6d657628c871913aa82 to your computer and use it in GitHub Desktop.
Save normanlmfung/5000e4574ce1f6d657628c871913aa82 to your computer and use it in GitHub Desktop.
aggregated_orderbook.py
from enum import Enum
import time
from datetime import datetime
from typing import Dict, List
import logging
from tabulate import tabulate
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
'''
Spot orderbooks REST API from exchanges
Binance https://binance-docs.github.io/apidocs/spot/en/#order-book
OKX https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-order-book
'sz': Order book depth per side. Maximum 400, e.g. 400 bids + 400 asks
Bybit https://bybit-exchange.github.io/docs/v5/market/orderbook
Coinbase https://docs.cdp.coinbase.com/exchange/reference/exchangerestapi_getproductbook
Kraken https://docs.kraken.com/api/docs/rest-api/get-order-book/
'''
from ccxt.binance import binance
from ccxt.okx import okx
from ccxt.bybit import bybit
from ccxt.coinbase import coinbase
from ccxt.kraken import kraken
from ccxt.base.exchange import Exchange
param : Dict = {
'normalized_symbol' : 'BTC/USDT',
'market_type' : 'spot', # you need to look at ccxt doc, for most exchanges, it's 'linear' or 'swap' for perpetuals. Example, https://github.com/ccxt/ccxt/blob/master/python/ccxt/okx.py?plain=1#L1110
'depth' : 1000,
'price_level_increment' : 10,
'sliding_window_num_intervals' : 90, # For example if each iteration takes 2 sec. 90 intervals = 180 sec (i.e. three minutes)
'update_imabalce_csv_intervals' : 100,
# Provider ID is part of mds publish topic.
'provider_id' : 1,
# Publish to message bus
'mds' : {
'mds_topic' : 'ccxt_ws_ob_$PROVIDER_ID$',
'redis' : {
'host' : 'localhost',
'port' : 6379,
'db' : 0,
'ttl_ms' : 1000*60
}
},
# Keep track of latency issues: ts_delta_observation_ms: Keep track of server clock vs timestamp from exchange
'ts_delta_observation_ms_threshold' : 150
}
normalized_symbol : str = param['normalized_symbol']
depth : int = param['depth']
market_type : str = param['market_type']
price_level_increment : float = param['price_level_increment']
sliding_window_num_intervals : int = param['sliding_window_num_intervals']
update_imabalce_csv_intervals : int = param['update_imabalce_csv_intervals']
param['job_name'] = f'ccxt_agg_ob_provider_{normalized_symbol.replace(":","_").replace("/","-")}'
logging.Formatter.converter = time.gmtime
logger = logging.getLogger()
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR
logger.setLevel(log_level)
format_str = '%(asctime)s %(message)s'
formatter = logging.Formatter(format_str)
sh = logging.StreamHandler()
sh.setLevel(log_level)
sh.setFormatter(formatter)
logger.addHandler(sh)
# fh = logging.FileHandler(f"{param['job_name']}.log")
# fh.setLevel(log_level)
# fh.setFormatter(formatter)
# logger.addHandler(fh)
coinbase_param : Dict[str, int] = { }
binance_param : Dict[str, int] = {
'limit' : depth if depth <=5000 else 5000
}
okx_param : Dict[str, int] = {
'sz' : depth if depth <= 400 else 400
}
bybit_param : Dict[str, int] = {
'limit' : depth if depth <= 200 else 200
}
kraken_param : Dict[str, int] = {
'depth' : depth if depth <= 500 else 500
}
coinbase_exchange = coinbase({
'defaultType' : market_type
})
binance_exchange = binance({
'defaultType' : market_type
})
okx_exchange = okx({
'defaultType' : market_type
})
bybit_exchange = bybit({
'defaultType' : market_type
})
kraken_exchange = kraken({
'defaultType' : market_type
})
class LogLevel(Enum):
CRITICAL = 50
ERROR = 40
WARNING = 30
INFO = 20
DEBUG = 10
NOTSET = 0
def log(message : str, log_level : LogLevel = LogLevel.INFO):
if log_level.value<LogLevel.WARNING.value:
logger.info(f"{datetime.now()} {message}")
elif log_level.value==LogLevel.WARNING.value:
logger.warning(f"{datetime.now()} {message}")
elif log_level.value==LogLevel.ERROR.value:
logger.error(f"{datetime.now()} {message}")
async def _fetch_orderbook(symbol : str, exchange : Exchange, fetch_ob_params : Dict):
try:
ob = exchange.fetch_order_book(symbol=symbol, params=fetch_ob_params)
is_valid = True
if 'timestamp' in ob and ob['timestamp']:
update_ts_ms = ob['timestamp']
ts_delta_observation_ms = int(datetime.now().timestamp()*1000) - update_ts_ms
is_valid = True if ts_delta_observation_ms<=param['ts_delta_observation_ms_threshold'] else False
bid_prices = [ x[0] for x in ob['bids'] ]
ask_prices = [ x[0] for x in ob['asks'] ]
min_bid_price = min(bid_prices)
max_bid_price = max(bid_prices)
min_ask_price = min(ask_prices)
max_ask_price = max(ask_prices)
mid = (max([ x[0] for x in ob['bids'] ]) + min([ x[0] for x in ob['asks'] ])) / 2
log(f"{exchange.name} mid: {mid}, min_bid_price: {min_bid_price}, max_bid_price: {max_bid_price}, min_ask_price: {min_ask_price}, max_ask_price: {max_ask_price}, range: {max_ask_price-min_bid_price}")
return {
'source' : exchange.name,
'orderbook' : ob,
'mid' : mid,
'min_bid_price' : min_bid_price,
'max_bid_price' : max_bid_price,
'min_ask_price' : min_ask_price,
'max_ask_price' : max_ask_price,
'is_valid' : is_valid
}
except Exception as fetch_err:
print(f"_fetch_orderbook failed for {exchange.name}: {fetch_err}")
return {
'source' : exchange.name,
'is_valid' : False
}
async def main():
pd_imbalances = pd.DataFrame(columns=['timestamp_ms', 'datetime', 'mid', 'imbalance', 'total_amount', 'pct_imbalance', 'ema_pct_imbalance'])
last_ema_pct_imbalance = None
i = 0
while True:
loop_start = time.time()
try:
orderbooks = await asyncio.gather(
_fetch_orderbook(symbol=normalized_symbol, exchange=coinbase_exchange, fetch_ob_params=coinbase_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=binance_exchange, fetch_ob_params=binance_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=bybit_exchange, fetch_ob_params=bybit_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=kraken_exchange, fetch_ob_params=kraken_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=okx_exchange, fetch_ob_params=okx_param)
)
valid_orderbooks = [ ob for ob in orderbooks if ob['is_valid'] ]
invalid_orderbooks = [ ob for ob in orderbooks if not ob['is_valid'] ]
invalid_orderbooks_names = " ".join([ ob['source'] for ob in invalid_orderbooks ] )
max_min_bid_price = max([ ob['min_bid_price'] for ob in valid_orderbooks if ob])
best_bid_price = max([ob['max_bid_price'] for ob in valid_orderbooks if ob])
min_max_ask_price = min([ob['max_ask_price'] for ob in valid_orderbooks if ob])
best_ask_price = min([ob['min_ask_price'] for ob in valid_orderbooks if ob])
elapsed_ms = (time.time() - loop_start) * 1000
logger.info(f"orderbooks fetch elapsed (ms): {elapsed_ms}, # orderbooks: {len(valid_orderbooks)}, max_min_bid_price: {max_min_bid_price}, min_max_ask_price: {min_max_ask_price}, best_bid_price: {best_bid_price}, best_ask_price: {best_ask_price}. Invalid books: {invalid_orderbooks_names}")
aggregated_orderbooks = {
'bids' : {},
'asks' : {}
}
def round_to_nearest(price, increment):
return round(price / increment) * increment
mid = [ x['mid'] for x in valid_orderbooks if x['source']=='OKX'][0] # use Bybit as mid reference
for orderbook in valid_orderbooks:
bids = orderbook['orderbook']['bids']
asks = orderbook['orderbook']['asks']
for bid in bids:
price = round_to_nearest(bid[0], price_level_increment)
amount = bid[1]
if bid[0] > max_min_bid_price:
existing_amount = 0
if price in aggregated_orderbooks['bids']:
existing_amount = aggregated_orderbooks['bids'][price]['amount']
amount_in_base_ccy = existing_amount + amount
amount_in_usdt = amount_in_base_ccy * mid
aggregated_orderbooks['bids'][price] = {
'price' : price,
'amount' : amount_in_base_ccy,
'amount_usdt' : amount_in_usdt
}
for ask in asks:
price = round_to_nearest(ask[0], price_level_increment)
amount = ask[1]
if ask[0] < min_max_ask_price:
existing_amount = 0
if price in aggregated_orderbooks['asks']:
existing_amount = aggregated_orderbooks['asks'][price]['amount']
amount_in_base_ccy = existing_amount + amount
amount_in_usdt = amount_in_base_ccy * mid
aggregated_orderbooks['asks'][price] = {
'price' : price,
'amount' : amount_in_base_ccy,
'amount_usdt' : amount_in_usdt
}
sorted_asks = dict(sorted(aggregated_orderbooks['asks'].items(), key=lambda item: item[0], reverse=True))
sorted_bids = dict(sorted(aggregated_orderbooks['bids'].items(), key=lambda item: item[0], reverse=True))
pd_aggregated_orderbooks_asks = pd.DataFrame(sorted_asks)
pd_aggregated_orderbooks_bids = pd.DataFrame(sorted_bids)
pd_aggregated_orderbooks_asks = pd_aggregated_orderbooks_asks.transpose()
pd_aggregated_orderbooks_bids = pd_aggregated_orderbooks_bids.transpose()
sum_asks_amount_usdt = pd.to_numeric(pd_aggregated_orderbooks_asks['amount_usdt']).sum()
sum_bids_amount_usdt = pd.to_numeric(pd_aggregated_orderbooks_bids['amount_usdt']).sum()
pd_aggregated_orderbooks_asks['str_amount_usdt'] = pd_aggregated_orderbooks_asks['amount_usdt'].apply(lambda x: f'{x:,.2f}')
pd_aggregated_orderbooks_bids['str_amount_usdt'] = pd_aggregated_orderbooks_bids['amount_usdt'].apply(lambda x: f'{x:,.2f}')
ask_resistance_price_level = pd_aggregated_orderbooks_asks['amount_usdt'].idxmax()
bid_support_price_level = pd_aggregated_orderbooks_bids['amount_usdt'].idxmax()
pd_aggregated_orderbooks_asks['is_max_amount_usdt'] = pd_aggregated_orderbooks_asks.index == ask_resistance_price_level
pd_aggregated_orderbooks_bids['is_max_amount_usdt'] = pd_aggregated_orderbooks_bids.index == bid_support_price_level
pd_aggregated_orderbooks_asks_ = pd_aggregated_orderbooks_asks[['price', 'amount', 'str_amount_usdt', 'is_max_amount_usdt']]
pd_aggregated_orderbooks_asks_.rename(columns={'str_amount_usdt': 'amount_usdt'}, inplace=True)
pd_aggregated_orderbooks_bids_ = pd_aggregated_orderbooks_bids[['price', 'amount', 'str_amount_usdt', 'is_max_amount_usdt']]
pd_aggregated_orderbooks_bids_.rename(columns={'str_amount_usdt': 'amount_usdt'}, inplace=True)
spread_bps = (best_ask_price-best_bid_price) / mid * 10000
spread_bps = round(spread_bps, 0)
imbalance = sum_bids_amount_usdt - sum_asks_amount_usdt
total_amount = sum_bids_amount_usdt + sum_asks_amount_usdt
pct_imbalance = (imbalance/total_amount) * 100
log(f"mid: {mid}, imbalance (bids - asks): {imbalance:,.0f}, pct_imbalance: {pct_imbalance:,.2f}, last_ema_pct_imbalance: {last_ema_pct_imbalance if last_ema_pct_imbalance else '--'}, spread_bps between bests: {spread_bps} (If < 0, arb opportunity). Range {max_min_bid_price} - {min_max_ask_price} (${int(min_max_ask_price-max_min_bid_price)})")
log(f"asks USD {sum_asks_amount_usdt:,.0f}, best: {best_ask_price:,.2f}")
log(f"{tabulate(pd_aggregated_orderbooks_asks_.reset_index(drop=True), headers='keys', tablefmt='psql', colalign=('right', 'right', 'right'), showindex=False)}")
log(f"bids USD {sum_bids_amount_usdt:,.0f}, best: {best_bid_price:,.2f}")
log(f"{tabulate(pd_aggregated_orderbooks_bids_.reset_index(drop=True), headers='keys', tablefmt='psql', colalign=('right', 'right', 'right'), showindex=False)}")
except Exception as loop_err:
log(f"#{i} Error: {loop_err}")
finally:
log(f"#{i} loop elapsed (ms): {(time.time()-loop_start)*1000}")
pd_imbalances.loc[i] = [ int(loop_start*1000), datetime.fromtimestamp(loop_start), mid, imbalance, total_amount, imbalance/total_amount * 100, np.nan ]
if i%update_imabalce_csv_intervals==0:
if pd_imbalances.shape[0]>sliding_window_num_intervals:
pd_imbalances['ema_pct_imbalance'] = pd_imbalances['pct_imbalance'].ewm(span=sliding_window_num_intervals, adjust=False).mean()
last_ema_pct_imbalance = pd_imbalances['ema_pct_imbalance'].iloc[-1]
pd_imbalances.to_csv("imbalances.csv")
i += 1
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment