Last active
June 1, 2026 12:50
-
-
Save oliver-zehentleitner/9421166ce95e9a285beaaca238875010 to your computer and use it in GitHub Desktop.
Stop all UBDCC DepthCaches.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from pprint import pprint | |
| from unicorn_binance_local_depth_cache import BinanceLocalDepthCacheManager, DepthCacheClusterNotReachableError | |
| import asyncio | |
| ubdcc_address: str = "YOUR_UBDCC_IP" | |
| ubdcc_port: int = 80 | |
| async def main(): | |
| dc = await ubldc.cluster.get_depthcache_list_async() | |
| for dcl_exchange in dc['depthcache_list']: | |
| print(f"Stopping {len(dc['depthcache_list'][dcl_exchange])} DepthCaches for exchange '{dcl_exchange}' on UBDCC " | |
| f"'{ubdcc_address}'!") | |
| loop = 1 | |
| for market in dc['depthcache_list'][dcl_exchange]: | |
| print(f"Stopping DepthCache #{loop}: {market}") | |
| pprint(ubldc.cluster.stop_depthcache(exchange=dcl_exchange, market=market)) | |
| loop += 1 | |
| await asyncio.sleep(1) | |
| try: | |
| with BinanceLocalDepthCacheManager(ubdcc_address=ubdcc_address, ubdcc_port=ubdcc_port) as ubldc: | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("\r\nGracefully stopping ...") | |
| except DepthCacheClusterNotReachableError as error_msg: | |
| print(f"ERROR: {error_msg}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment