Skip to content

Instantly share code, notes, and snippets.

@oliver-zehentleitner
Last active June 1, 2026 12:50
Show Gist options
  • Select an option

  • Save oliver-zehentleitner/9421166ce95e9a285beaaca238875010 to your computer and use it in GitHub Desktop.

Select an option

Save oliver-zehentleitner/9421166ce95e9a285beaaca238875010 to your computer and use it in GitHub Desktop.
Stop all UBDCC DepthCaches.
from pprint import pprint
from unicorn_binance_local_depth_cache import BinanceLocalDepthCacheManager, DepthCacheClusterNotReachableError
import asyncio
ubdcc_address: str = "YOUR_UBDCC_IP"
ubdcc_port: int = 80
async def main():
dc = await ubldc.cluster.get_depthcache_list_async()
for dcl_exchange in dc['depthcache_list']:
print(f"Stopping {len(dc['depthcache_list'][dcl_exchange])} DepthCaches for exchange '{dcl_exchange}' on UBDCC "
f"'{ubdcc_address}'!")
loop = 1
for market in dc['depthcache_list'][dcl_exchange]:
print(f"Stopping DepthCache #{loop}: {market}")
pprint(ubldc.cluster.stop_depthcache(exchange=dcl_exchange, market=market))
loop += 1
await asyncio.sleep(1)
try:
with BinanceLocalDepthCacheManager(ubdcc_address=ubdcc_address, ubdcc_port=ubdcc_port) as ubldc:
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\r\nGracefully stopping ...")
except DepthCacheClusterNotReachableError as error_msg:
print(f"ERROR: {error_msg}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment