-
-
Save normanlmfung/5000e4574ce1f6d657628c871913aa82 to your computer and use it in GitHub Desktop.
aggregated_orderbook.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
import time | |
from datetime import datetime | |
from typing import Dict, List | |
import logging | |
from tabulate import tabulate | |
import asyncio | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
''' | |
Spot orderbooks REST API from exchanges | |
Binance https://binance-docs.github.io/apidocs/spot/en/#order-book | |
OKX https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-order-book | |
'sz': Order book depth per side. Maximum 400, e.g. 400 bids + 400 asks | |
Bybit https://bybit-exchange.github.io/docs/v5/market/orderbook | |
Coinbase https://docs.cdp.coinbase.com/exchange/reference/exchangerestapi_getproductbook | |
Kraken https://docs.kraken.com/api/docs/rest-api/get-order-book/ | |
''' | |
from ccxt.binance import binance | |
from ccxt.okx import okx | |
from ccxt.bybit import bybit | |
from ccxt.coinbase import coinbase | |
from ccxt.kraken import kraken | |
from ccxt.base.exchange import Exchange | |
param : Dict = { | |
'normalized_symbol' : 'BTC/USDT', | |
'market_type' : 'spot', # you need to look at ccxt doc, for most exchanges, it's 'linear' or 'swap' for perpetuals. Example, https://github.com/ccxt/ccxt/blob/master/python/ccxt/okx.py?plain=1#L1110 | |
'depth' : 1000, | |
'price_level_increment' : 10, | |
'sliding_window_num_intervals' : 90, # For example if each iteration takes 2 sec. 90 intervals = 180 sec (i.e. three minutes) | |
'update_imabalce_csv_intervals' : 100, | |
# Provider ID is part of mds publish topic. | |
'provider_id' : 1, | |
# Publish to message bus | |
'mds' : { | |
'mds_topic' : 'ccxt_ws_ob_$PROVIDER_ID$', | |
'redis' : { | |
'host' : 'localhost', | |
'port' : 6379, | |
'db' : 0, | |
'ttl_ms' : 1000*60 | |
} | |
}, | |
# Keep track of latency issues: ts_delta_observation_ms: Keep track of server clock vs timestamp from exchange | |
'ts_delta_observation_ms_threshold' : 150 | |
} | |
normalized_symbol : str = param['normalized_symbol'] | |
depth : int = param['depth'] | |
market_type : str = param['market_type'] | |
price_level_increment : float = param['price_level_increment'] | |
sliding_window_num_intervals : int = param['sliding_window_num_intervals'] | |
update_imabalce_csv_intervals : int = param['update_imabalce_csv_intervals'] | |
param['job_name'] = f'ccxt_agg_ob_provider_{normalized_symbol.replace(":","_").replace("/","-")}' | |
logging.Formatter.converter = time.gmtime | |
logger = logging.getLogger() | |
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR | |
logger.setLevel(log_level) | |
format_str = '%(asctime)s %(message)s' | |
formatter = logging.Formatter(format_str) | |
sh = logging.StreamHandler() | |
sh.setLevel(log_level) | |
sh.setFormatter(formatter) | |
logger.addHandler(sh) | |
# fh = logging.FileHandler(f"{param['job_name']}.log") | |
# fh.setLevel(log_level) | |
# fh.setFormatter(formatter) | |
# logger.addHandler(fh) | |
coinbase_param : Dict[str, int] = { } | |
binance_param : Dict[str, int] = { | |
'limit' : depth if depth <=5000 else 5000 | |
} | |
okx_param : Dict[str, int] = { | |
'sz' : depth if depth <= 400 else 400 | |
} | |
bybit_param : Dict[str, int] = { | |
'limit' : depth if depth <= 200 else 200 | |
} | |
kraken_param : Dict[str, int] = { | |
'depth' : depth if depth <= 500 else 500 | |
} | |
coinbase_exchange = coinbase({ | |
'defaultType' : market_type | |
}) | |
binance_exchange = binance({ | |
'defaultType' : market_type | |
}) | |
okx_exchange = okx({ | |
'defaultType' : market_type | |
}) | |
bybit_exchange = bybit({ | |
'defaultType' : market_type | |
}) | |
kraken_exchange = kraken({ | |
'defaultType' : market_type | |
}) | |
class LogLevel(Enum): | |
CRITICAL = 50 | |
ERROR = 40 | |
WARNING = 30 | |
INFO = 20 | |
DEBUG = 10 | |
NOTSET = 0 | |
def log(message : str, log_level : LogLevel = LogLevel.INFO): | |
if log_level.value<LogLevel.WARNING.value: | |
logger.info(f"{datetime.now()} {message}") | |
elif log_level.value==LogLevel.WARNING.value: | |
logger.warning(f"{datetime.now()} {message}") | |
elif log_level.value==LogLevel.ERROR.value: | |
logger.error(f"{datetime.now()} {message}") | |
async def _fetch_orderbook(symbol : str, exchange : Exchange, fetch_ob_params : Dict): | |
try: | |
ob = exchange.fetch_order_book(symbol=symbol, params=fetch_ob_params) | |
is_valid = True | |
if 'timestamp' in ob and ob['timestamp']: | |
update_ts_ms = ob['timestamp'] | |
ts_delta_observation_ms = int(datetime.now().timestamp()*1000) - update_ts_ms | |
is_valid = True if ts_delta_observation_ms<=param['ts_delta_observation_ms_threshold'] else False | |
bid_prices = [ x[0] for x in ob['bids'] ] | |
ask_prices = [ x[0] for x in ob['asks'] ] | |
min_bid_price = min(bid_prices) | |
max_bid_price = max(bid_prices) | |
min_ask_price = min(ask_prices) | |
max_ask_price = max(ask_prices) | |
mid = (max([ x[0] for x in ob['bids'] ]) + min([ x[0] for x in ob['asks'] ])) / 2 | |
log(f"{exchange.name} mid: {mid}, min_bid_price: {min_bid_price}, max_bid_price: {max_bid_price}, min_ask_price: {min_ask_price}, max_ask_price: {max_ask_price}, range: {max_ask_price-min_bid_price}") | |
return { | |
'source' : exchange.name, | |
'orderbook' : ob, | |
'mid' : mid, | |
'min_bid_price' : min_bid_price, | |
'max_bid_price' : max_bid_price, | |
'min_ask_price' : min_ask_price, | |
'max_ask_price' : max_ask_price, | |
'is_valid' : is_valid | |
} | |
except Exception as fetch_err: | |
print(f"_fetch_orderbook failed for {exchange.name}: {fetch_err}") | |
return { | |
'source' : exchange.name, | |
'is_valid' : False | |
} | |
async def main(): | |
pd_imbalances = pd.DataFrame(columns=['timestamp_ms', 'datetime', 'mid', 'imbalance', 'total_amount', 'pct_imbalance', 'ema_pct_imbalance']) | |
last_ema_pct_imbalance = None | |
i = 0 | |
while True: | |
loop_start = time.time() | |
try: | |
orderbooks = await asyncio.gather( | |
_fetch_orderbook(symbol=normalized_symbol, exchange=coinbase_exchange, fetch_ob_params=coinbase_param), | |
_fetch_orderbook(symbol=normalized_symbol, exchange=binance_exchange, fetch_ob_params=binance_param), | |
_fetch_orderbook(symbol=normalized_symbol, exchange=bybit_exchange, fetch_ob_params=bybit_param), | |
_fetch_orderbook(symbol=normalized_symbol, exchange=kraken_exchange, fetch_ob_params=kraken_param), | |
_fetch_orderbook(symbol=normalized_symbol, exchange=okx_exchange, fetch_ob_params=okx_param) | |
) | |
valid_orderbooks = [ ob for ob in orderbooks if ob['is_valid'] ] | |
invalid_orderbooks = [ ob for ob in orderbooks if not ob['is_valid'] ] | |
invalid_orderbooks_names = " ".join([ ob['source'] for ob in invalid_orderbooks ] ) | |
max_min_bid_price = max([ ob['min_bid_price'] for ob in valid_orderbooks if ob]) | |
best_bid_price = max([ob['max_bid_price'] for ob in valid_orderbooks if ob]) | |
min_max_ask_price = min([ob['max_ask_price'] for ob in valid_orderbooks if ob]) | |
best_ask_price = min([ob['min_ask_price'] for ob in valid_orderbooks if ob]) | |
elapsed_ms = (time.time() - loop_start) * 1000 | |
logger.info(f"orderbooks fetch elapsed (ms): {elapsed_ms}, # orderbooks: {len(valid_orderbooks)}, max_min_bid_price: {max_min_bid_price}, min_max_ask_price: {min_max_ask_price}, best_bid_price: {best_bid_price}, best_ask_price: {best_ask_price}. Invalid books: {invalid_orderbooks_names}") | |
aggregated_orderbooks = { | |
'bids' : {}, | |
'asks' : {} | |
} | |
def round_to_nearest(price, increment): | |
return round(price / increment) * increment | |
mid = [ x['mid'] for x in valid_orderbooks if x['source']=='OKX'][0] # use Bybit as mid reference | |
for orderbook in valid_orderbooks: | |
bids = orderbook['orderbook']['bids'] | |
asks = orderbook['orderbook']['asks'] | |
for bid in bids: | |
price = round_to_nearest(bid[0], price_level_increment) | |
amount = bid[1] | |
if bid[0] > max_min_bid_price: | |
existing_amount = 0 | |
if price in aggregated_orderbooks['bids']: | |
existing_amount = aggregated_orderbooks['bids'][price]['amount'] | |
amount_in_base_ccy = existing_amount + amount | |
amount_in_usdt = amount_in_base_ccy * mid | |
aggregated_orderbooks['bids'][price] = { | |
'price' : price, | |
'amount' : amount_in_base_ccy, | |
'amount_usdt' : amount_in_usdt | |
} | |
for ask in asks: | |
price = round_to_nearest(ask[0], price_level_increment) | |
amount = ask[1] | |
if ask[0] < min_max_ask_price: | |
existing_amount = 0 | |
if price in aggregated_orderbooks['asks']: | |
existing_amount = aggregated_orderbooks['asks'][price]['amount'] | |
amount_in_base_ccy = existing_amount + amount | |
amount_in_usdt = amount_in_base_ccy * mid | |
aggregated_orderbooks['asks'][price] = { | |
'price' : price, | |
'amount' : amount_in_base_ccy, | |
'amount_usdt' : amount_in_usdt | |
} | |
sorted_asks = dict(sorted(aggregated_orderbooks['asks'].items(), key=lambda item: item[0], reverse=True)) | |
sorted_bids = dict(sorted(aggregated_orderbooks['bids'].items(), key=lambda item: item[0], reverse=True)) | |
pd_aggregated_orderbooks_asks = pd.DataFrame(sorted_asks) | |
pd_aggregated_orderbooks_bids = pd.DataFrame(sorted_bids) | |
pd_aggregated_orderbooks_asks = pd_aggregated_orderbooks_asks.transpose() | |
pd_aggregated_orderbooks_bids = pd_aggregated_orderbooks_bids.transpose() | |
sum_asks_amount_usdt = pd.to_numeric(pd_aggregated_orderbooks_asks['amount_usdt']).sum() | |
sum_bids_amount_usdt = pd.to_numeric(pd_aggregated_orderbooks_bids['amount_usdt']).sum() | |
pd_aggregated_orderbooks_asks['str_amount_usdt'] = pd_aggregated_orderbooks_asks['amount_usdt'].apply(lambda x: f'{x:,.2f}') | |
pd_aggregated_orderbooks_bids['str_amount_usdt'] = pd_aggregated_orderbooks_bids['amount_usdt'].apply(lambda x: f'{x:,.2f}') | |
ask_resistance_price_level = pd_aggregated_orderbooks_asks['amount_usdt'].idxmax() | |
bid_support_price_level = pd_aggregated_orderbooks_bids['amount_usdt'].idxmax() | |
pd_aggregated_orderbooks_asks['is_max_amount_usdt'] = pd_aggregated_orderbooks_asks.index == ask_resistance_price_level | |
pd_aggregated_orderbooks_bids['is_max_amount_usdt'] = pd_aggregated_orderbooks_bids.index == bid_support_price_level | |
pd_aggregated_orderbooks_asks_ = pd_aggregated_orderbooks_asks[['price', 'amount', 'str_amount_usdt', 'is_max_amount_usdt']] | |
pd_aggregated_orderbooks_asks_.rename(columns={'str_amount_usdt': 'amount_usdt'}, inplace=True) | |
pd_aggregated_orderbooks_bids_ = pd_aggregated_orderbooks_bids[['price', 'amount', 'str_amount_usdt', 'is_max_amount_usdt']] | |
pd_aggregated_orderbooks_bids_.rename(columns={'str_amount_usdt': 'amount_usdt'}, inplace=True) | |
spread_bps = (best_ask_price-best_bid_price) / mid * 10000 | |
spread_bps = round(spread_bps, 0) | |
imbalance = sum_bids_amount_usdt - sum_asks_amount_usdt | |
total_amount = sum_bids_amount_usdt + sum_asks_amount_usdt | |
pct_imbalance = (imbalance/total_amount) * 100 | |
log(f"mid: {mid}, imbalance (bids - asks): {imbalance:,.0f}, pct_imbalance: {pct_imbalance:,.2f}, last_ema_pct_imbalance: {last_ema_pct_imbalance if last_ema_pct_imbalance else '--'}, spread_bps between bests: {spread_bps} (If < 0, arb opportunity). Range {max_min_bid_price} - {min_max_ask_price} (${int(min_max_ask_price-max_min_bid_price)})") | |
log(f"asks USD {sum_asks_amount_usdt:,.0f}, best: {best_ask_price:,.2f}") | |
log(f"{tabulate(pd_aggregated_orderbooks_asks_.reset_index(drop=True), headers='keys', tablefmt='psql', colalign=('right', 'right', 'right'), showindex=False)}") | |
log(f"bids USD {sum_bids_amount_usdt:,.0f}, best: {best_bid_price:,.2f}") | |
log(f"{tabulate(pd_aggregated_orderbooks_bids_.reset_index(drop=True), headers='keys', tablefmt='psql', colalign=('right', 'right', 'right'), showindex=False)}") | |
except Exception as loop_err: | |
log(f"#{i} Error: {loop_err}") | |
finally: | |
log(f"#{i} loop elapsed (ms): {(time.time()-loop_start)*1000}") | |
pd_imbalances.loc[i] = [ int(loop_start*1000), datetime.fromtimestamp(loop_start), mid, imbalance, total_amount, imbalance/total_amount * 100, np.nan ] | |
if i%update_imabalce_csv_intervals==0: | |
if pd_imbalances.shape[0]>sliding_window_num_intervals: | |
pd_imbalances['ema_pct_imbalance'] = pd_imbalances['pct_imbalance'].ewm(span=sliding_window_num_intervals, adjust=False).mean() | |
last_ema_pct_imbalance = pd_imbalances['ema_pct_imbalance'].iloc[-1] | |
pd_imbalances.to_csv("imbalances.csv") | |
i += 1 | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment