Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save oliver-zehentleitner/e0c55876d5c80d06ebf1e36409f94152 to your computer and use it in GitHub Desktop.

Select an option

Save oliver-zehentleitner/e0c55876d5c80d06ebf1e36409f94152 to your computer and use it in GitHub Desktop.
Creates a UBDCC DepthCache with 2 replicas for every active spot market (status == "TRADING") on binance.com-futures.
from pprint import pprint
from unicorn_binance_local_depth_cache import BinanceLocalDepthCacheManager, DepthCacheClusterNotReachableError
from unicorn_binance_rest_api import BinanceRestApiManager
import asyncio
exchange: str = "binance.com-futures"
ubdcc_address: str = "YOUR_UBDCC_IP"
ubdcc_port: int = 80
async def main():
with BinanceRestApiManager(exchange=exchange) as ubra:
exchange_info = ubra.futures_exchange_info()
markets = [item['symbol'] for item in exchange_info['symbols']
if item['status'] == "TRADING"]
result = await ubldc.cluster.create_depthcaches_async(exchange=exchange, markets=markets, desired_quantity=2)
print(f"Adding {len(markets)} DepthCaches for exchange '{exchange}' on UBDCC '{ubdcc_address}':")
pprint(result)
try:
with BinanceLocalDepthCacheManager(exchange=exchange, ubdcc_address=ubdcc_address, ubdcc_port=ubdcc_port) as ubldc:
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\r\nGracefully stopping ...")
except Exception as error_msg:
print(f"ERROR: {error_msg}")
except DepthCacheClusterNotReachableError as error_msg:
print(f"ERROR: {error_msg}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment