Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save oliver-zehentleitner/ea3cf73df07dbc3fc0f618fae81ff659 to your computer and use it in GitHub Desktop.

Select an option

Save oliver-zehentleitner/ea3cf73df07dbc3fc0f618fae81ff659 to your computer and use it in GitHub Desktop.
Query BIDS from all UBDCC DepthCaches.
from pprint import pprint
from unicorn_binance_local_depth_cache import BinanceLocalDepthCacheManager, DepthCacheClusterNotReachableError
import asyncio
debug: bool = False
limit_count: int = 5
threshold_volume: float = 20000.0
ubdcc_address: str = "YOUR_UBDCC_IP"
ubdcc_port: int = 80
async def main():
dc = await ubldc.cluster.get_depthcache_list_async(debug=debug)
errors = {}
non_working_caches = []
working_caches = []
for dcl_exchange in dc['depthcache_list']:
print(f"Testing {len(dc['depthcache_list'][dcl_exchange])} DepthCaches for exchange '{dcl_exchange}' on UBDCC "
f"'{ubdcc_address}'!")
loop = 1
for market in dc['depthcache_list'][dcl_exchange]:
bids = await ubldc.cluster.get_bids_async(exchange=dcl_exchange, market=market, limit_count=limit_count,
threshold_volume=threshold_volume, debug=debug)
if bids.get('error_id') is not None:
print(f"Bids from DepthCache #{loop} '{market}' failed: {bids.get('error_id')} - {bids.get('message')}")
pprint(bids)
errors[bids.get('error_id')] = 1 if errors.get(bids.get('error_id')) is None else errors.get(bids.get('error_id')) + 1
non_working_caches.append(market)
else:
print(f"Bids (limit_count={limit_count}, threshold_volume={threshold_volume}) from DepthCache #{loop} "
f"'{market}':")
pprint(bids)
working_caches.append(market)
loop += 1
print(f"Successful working caches: {len(working_caches)}")
if len(errors) > 0:
print(f"ERRORS:")
pprint(errors)
await asyncio.sleep(1)
try:
with BinanceLocalDepthCacheManager(ubdcc_address=ubdcc_address, ubdcc_port=ubdcc_port) as ubldc:
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\r\nGracefully stopping ...")
except Exception as error_msg:
print(f"ERROR: {error_msg}")
except DepthCacheClusterNotReachableError as error_msg:
print(f"ERROR: {error_msg}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment