from enum import Enum
import time
from datetime import datetime
from typing import Dict, List
import logging
from tabulate import tabulate
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
Spot orderbooks REST API from exchanges
'sz': Order book depth per side. Maximum 400, e.g. 400 bids + 400 asks
from ccxt.binance import binance
from ccxt.okx import okx
from ccxt.bybit import bybit
from ccxt.coinbase import coinbase
from ccxt.kraken import kraken
from import Exchange
param : Dict = {
'normalized_symbol' : 'BTC/USDT',
'market_type' : 'spot', # you need to look at ccxt doc, for most exchanges, it's 'linear' or 'swap' for perpetuals. Example,
'depth' : 1000,
'price_level_increment' : 10,
'sliding_window_num_intervals' : 90, # For example if each iteration takes 2 sec. 90 intervals = 180 sec (i.e. three minutes)
'update_imabalce_csv_intervals' : 100,
# Provider ID is part of mds publish topic.
'provider_id' : 1,
# Publish to message bus
'mds' : {
'mds_topic' : 'ccxt_ws_ob_$PROVIDER_ID$',
'redis' : {
'host' : 'localhost',
'port' : 6379,
'db' : 0,
'ttl_ms' : 1000*60
# Keep track of latency issues: ts_delta_observation_ms: Keep track of server clock vs timestamp from exchange
'ts_delta_observation_ms_threshold' : 150
normalized_symbol : str = param['normalized_symbol']
depth : int = param['depth']
market_type : str = param['market_type']
price_level_increment : float = param['price_level_increment']
sliding_window_num_intervals : int = param['sliding_window_num_intervals']
update_imabalce_csv_intervals : int = param['update_imabalce_csv_intervals']
param['job_name'] = f'ccxt_agg_ob_provider_{normalized_symbol.replace(":","_").replace("/","-")}'
logging.Formatter.converter = time.gmtime
logger = logging.getLogger()
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR
format_str = '%(asctime)s %(message)s'
formatter = logging.Formatter(format_str)
sh = logging.StreamHandler()
# fh = logging.FileHandler(f"{param['job_name']}.log")
# fh.setLevel(log_level)
# fh.setFormatter(formatter)
# logger.addHandler(fh)
coinbase_param : Dict[str, int] = { }
binance_param : Dict[str, int] = {
'limit' : depth if depth <=5000 else 5000
okx_param : Dict[str, int] = {
'sz' : depth if depth <= 400 else 400
bybit_param : Dict[str, int] = {
'limit' : depth if depth <= 200 else 200
kraken_param : Dict[str, int] = {
'depth' : depth if depth <= 500 else 500
coinbase_exchange = coinbase({
'defaultType' : market_type
binance_exchange = binance({
'defaultType' : market_type
okx_exchange = okx({
'defaultType' : market_type
bybit_exchange = bybit({
'defaultType' : market_type
kraken_exchange = kraken({
'defaultType' : market_type
class LogLevel(Enum):
ERROR = 40
INFO = 20
DEBUG = 10
def log(message : str, log_level : LogLevel = LogLevel.INFO):
if log_level.value<LogLevel.WARNING.value:"{} {message}")
elif log_level.value==LogLevel.WARNING.value:
logger.warning(f"{} {message}")
elif log_level.value==LogLevel.ERROR.value:
logger.error(f"{} {message}")
async def _fetch_orderbook(symbol : str, exchange : Exchange, fetch_ob_params : Dict):
ob = exchange.fetch_order_book(symbol=symbol, params=fetch_ob_params)
is_valid = True
if 'timestamp' in ob and ob['timestamp']:
update_ts_ms = ob['timestamp']
ts_delta_observation_ms = int(*1000) - update_ts_ms
is_valid = True if ts_delta_observation_ms<=param['ts_delta_observation_ms_threshold'] else False
bid_prices = [ x[0] for x in ob['bids'] ]
ask_prices = [ x[0] for x in ob['asks'] ]
min_bid_price = min(bid_prices)
max_bid_price = max(bid_prices)
min_ask_price = min(ask_prices)
max_ask_price = max(ask_prices)
mid = (max([ x[0] for x in ob['bids'] ]) + min([ x[0] for x in ob['asks'] ])) / 2
log(f"{} mid: {mid}, min_bid_price: {min_bid_price}, max_bid_price: {max_bid_price}, min_ask_price: {min_ask_price}, max_ask_price: {max_ask_price}, range: {max_ask_price-min_bid_price}")
return {
'source' :,
'orderbook' : ob,
'mid' : mid,
'min_bid_price' : min_bid_price,
'max_bid_price' : max_bid_price,
'min_ask_price' : min_ask_price,
'max_ask_price' : max_ask_price,
'is_valid' : is_valid
except Exception as fetch_err:
print(f"_fetch_orderbook failed for {}: {fetch_err}")
return {
'source' :,
'is_valid' : False
async def main():
pd_imbalances = pd.DataFrame(columns=['timestamp_ms', 'datetime', 'mid', 'imbalance', 'total_amount', 'pct_imbalance', 'ema_pct_imbalance'])
last_ema_pct_imbalance = None
i = 0
while True:
loop_start = time.time()
orderbooks = await asyncio.gather(
_fetch_orderbook(symbol=normalized_symbol, exchange=coinbase_exchange, fetch_ob_params=coinbase_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=binance_exchange, fetch_ob_params=binance_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=bybit_exchange, fetch_ob_params=bybit_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=kraken_exchange, fetch_ob_params=kraken_param),
_fetch_orderbook(symbol=normalized_symbol, exchange=okx_exchange, fetch_ob_params=okx_param)
valid_orderbooks = [ ob for ob in orderbooks if ob['is_valid'] ]
invalid_orderbooks = [ ob for ob in orderbooks if not ob['is_valid'] ]
invalid_orderbooks_names = " ".join([ ob['source'] for ob in invalid_orderbooks ] )
max_min_bid_price = max([ ob['min_bid_price'] for ob in valid_orderbooks if ob])
best_bid_price = max([ob['max_bid_price'] for ob in valid_orderbooks if ob])
min_max_ask_price = min([ob['max_ask_price'] for ob in valid_orderbooks if ob])
best_ask_price = min([ob['min_ask_price'] for ob in valid_orderbooks if ob])
elapsed_ms = (time.time() - loop_start) * 1000"orderbooks fetch elapsed (ms): {elapsed_ms}, # orderbooks: {len(valid_orderbooks)}, max_min_bid_price: {max_min_bid_price}, min_max_ask_price: {min_max_ask_price}, best_bid_price: {best_bid_price}, best_ask_price: {best_ask_price}. Invalid books: {invalid_orderbooks_names}")
aggregated_orderbooks = {
'bids' : {},
'asks' : {}
def round_to_nearest(price, increment):
return round(price / increment) * increment
mid = [ x['mid'] for x in valid_orderbooks if x['source']=='OKX'][0] # use Bybit as mid reference
for orderbook in valid_orderbooks:
bids = orderbook['orderbook']['bids']
asks = orderbook['orderbook']['asks']
for bid in bids:
price = round_to_nearest(bid[0], price_level_increment)
amount = bid[1]
if bid[0] > max_min_bid_price:
existing_amount = 0
if price in aggregated_orderbooks['bids']:
existing_amount = aggregated_orderbooks['bids'][price]['amount']
amount_in_base_ccy = existing_amount + amount
amount_in_usdt = amount_in_base_ccy * mid
aggregated_orderbooks['bids'][price] = {
'price' : price,
'amount' : amount_in_base_ccy,
'amount_usdt' : amount_in_usdt
for ask in asks:
price = round_to_nearest(ask[0], price_level_increment)
amount = ask[1]
if ask[0] < min_max_ask_price:
existing_amount = 0
if price in aggregated_orderbooks['asks']:
existing_amount = aggregated_orderbooks['asks'][price]['amount']
amount_in_base_ccy = existing_amount + amount
amount_in_usdt = amount_in_base_ccy * mid
aggregated_orderbooks['asks'][price] = {
'price' : price,
'amount' : amount_in_base_ccy,
'amount_usdt' : amount_in_usdt
sorted_asks = dict(sorted(aggregated_orderbooks['asks'].items(), key=lambda item: item[0], reverse=True))
sorted_bids = dict(sorted(aggregated_orderbooks['bids'].items(), key=lambda item: item[0], reverse=True))
pd_aggregated_orderbooks_asks = pd.DataFrame(sorted_asks)
pd_aggregated_orderbooks_bids = pd.DataFrame(sorted_bids)
pd_aggregated_orderbooks_asks = pd_aggregated_orderbooks_asks.transpose()
pd_aggregated_orderbooks_bids = pd_aggregated_orderbooks_bids.transpose()
sum_asks_amount_usdt = pd.to_numeric(pd_aggregated_orderbooks_asks['amount_usdt']).sum()
sum_bids_amount_usdt = pd.to_numeric(pd_aggregated_orderbooks_bids['amount_usdt']).sum()
pd_aggregated_orderbooks_asks['str_amount_usdt'] = pd_aggregated_orderbooks_asks['amount_usdt'].apply(lambda x: f'{x:,.2f}')
pd_aggregated_orderbooks_bids['str_amount_usdt'] = pd_aggregated_orderbooks_bids['amount_usdt'].apply(lambda x: f'{x:,.2f}')
ask_resistance_price_level = pd_aggregated_orderbooks_asks['amount_usdt'].idxmax()
bid_support_price_level = pd_aggregated_orderbooks_bids['amount_usdt'].idxmax()
pd_aggregated_orderbooks_asks['is_max_amount_usdt'] = pd_aggregated_orderbooks_asks.index == ask_resistance_price_level
pd_aggregated_orderbooks_bids['is_max_amount_usdt'] = pd_aggregated_orderbooks_bids.index == bid_support_price_level
pd_aggregated_orderbooks_asks_ = pd_aggregated_orderbooks_asks[['price', 'amount', 'str_amount_usdt', 'is_max_amount_usdt']]
pd_aggregated_orderbooks_asks_.rename(columns={'str_amount_usdt': 'amount_usdt'}, inplace=True)
pd_aggregated_orderbooks_bids_ = pd_aggregated_orderbooks_bids[['price', 'amount', 'str_amount_usdt', 'is_max_amount_usdt']]
pd_aggregated_orderbooks_bids_.rename(columns={'str_amount_usdt': 'amount_usdt'}, inplace=True)
spread_bps = (best_ask_price-best_bid_price) / mid * 10000
spread_bps = round(spread_bps, 0)
imbalance = sum_bids_amount_usdt - sum_asks_amount_usdt
total_amount = sum_bids_amount_usdt + sum_asks_amount_usdt
pct_imbalance = (imbalance/total_amount) * 100
log(f"mid: {mid}, imbalance (bids - asks): {imbalance:,.0f}, pct_imbalance: {pct_imbalance:,.2f}, last_ema_pct_imbalance: {last_ema_pct_imbalance if last_ema_pct_imbalance else '--'}, spread_bps between bests: {spread_bps} (If < 0, arb opportunity). Range {max_min_bid_price} - {min_max_ask_price} (${int(min_max_ask_price-max_min_bid_price)})")
log(f"asks USD {sum_asks_amount_usdt:,.0f}, best: {best_ask_price:,.2f}")
log(f"{tabulate(pd_aggregated_orderbooks_asks_.reset_index(drop=True), headers='keys', tablefmt='psql', colalign=('right', 'right', 'right'), showindex=False)}")
log(f"bids USD {sum_bids_amount_usdt:,.0f}, best: {best_bid_price:,.2f}")
log(f"{tabulate(pd_aggregated_orderbooks_bids_.reset_index(drop=True), headers='keys', tablefmt='psql', colalign=('right', 'right', 'right'), showindex=False)}")
except Exception as loop_err:
log(f"#{i} Error: {loop_err}")
log(f"#{i} loop elapsed (ms): {(time.time()-loop_start)*1000}")
pd_imbalances.loc[i] = [ int(loop_start*1000), datetime.fromtimestamp(loop_start), mid, imbalance, total_amount, imbalance/total_amount * 100, np.nan ]
if i%update_imabalce_csv_intervals==0:
if pd_imbalances.shape[0]>sliding_window_num_intervals:
pd_imbalances['ema_pct_imbalance'] = pd_imbalances['pct_imbalance'].ewm(span=sliding_window_num_intervals, adjust=False).mean()
last_ema_pct_imbalance = pd_imbalances['ema_pct_imbalance'].iloc[-1]
i += 1
