Skip to content

Instantly share code, notes, and snippets.

@MtkN1
Created April 26, 2022 17:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MtkN1/1dc45c33fb67dc04b1c504ea0f970f2d to your computer and use it in GitHub Desktop.
Save MtkN1/1dc45c33fb67dc04b1c504ea0f970f2d to your computer and use it in GitHub Desktop.
import asyncio
import aiohttp
import datetime
from aiohttp import web
heartbeat = 10.0
status = {}
async def ping(ws: aiohttp.ClientWebSocketResponse, specific_ping: str):
await asyncio.sleep(heartbeat)
try:
await ws.send_str(specific_ping)
except ConnectionError:
return
else:
asyncio.create_task(ping(ws, specific_ping))
async def ws_session(session: aiohttp.ClientSession, url: str, send_str: str | None = None, specific_ping: str | None = None):
status[url] = None
while True:
try:
async with session.ws_connect(url, heartbeat=heartbeat) as ws:
if specific_ping:
asyncio.create_task(ping(ws, specific_ping))
if send_str:
asyncio.create_task(ws.send_str(send_str))
async for msg in ws:
status[url] = datetime.datetime.now().astimezone(datetime.timezone.utc).astimezone(datetime.timezone(datetime.timedelta(hours=9))).replace(tzinfo=None).isoformat()
except Exception:
await asyncio.sleep(60.0)
async def main():
async with aiohttp.ClientSession() as session:
await asyncio.gather(
# Bybit
ws_session(session, "wss://stream.bybit.com/realtime", '{"op":"subscribe","args":["orderBookL2_25.BTCUSD"]}'),
ws_session(session, "wss://stream.bybit.com/realtime_public", '{"op":"subscribe","args":["orderBookL2_25.BTCUSDT"]}'),
ws_session(session, "wss://stream.bybit.com/spot/quote/ws/v1", '{"topic":"diffDepth","event":"sub","symbol":"BTCUSDT","params":{"binary":false}}'),
ws_session(session, "wss://stream.bybit.com/spot/quote/ws/v2", '{"topic":"depth","event":"sub","params":{"symbol":"BTCUSDT","binary":false}}'),
# Binance
ws_session(session, "wss://stream.binance.com:9443/stream?streams=btcusdt@depth@100ms"),
ws_session(session, "wss://fstream.binance.com/stream?streams=btcusdt@depth@100ms"),
ws_session(session, "wss://dstream.binance.com/stream?streams=btcusd_perp@depth@100ms"),
# OKX
ws_session(session, "wss://ws.okx.com:8443/ws/v5/public", '{"op":"subscribe","args":[{"channel":"books","instId":"BTC-USDT-SWAP"}]}'),
# Phemex
ws_session(session, "wss://phemex.com/ws", '{"id":1234,"method":"orderbook.subscribe","params":["BTCUSD"]}'),
# Bitget
ws_session(session, "wss://ws.bitget.com/spot/v1/stream", '{"op":"subscribe","args":[{"instType":"sp","channel":"books","instId":"BTCUSDT"}]}', specific_ping="ping"),
ws_session(session, "wss://ws.bitget.com/mix/v1/stream", '{"op":"subscribe","args":[{"instType":"mc","channel":"books","instId":"BTCUSDT"}]}', specific_ping="ping"),
# MEXC
ws_session(session, "wss://contract.mexc.com/ws", '{"method":"sub.depth","param":{"symbol":"BTC_USDT"}}', specific_ping='{"method":"ping"}'),
# FTX
ws_session(session, "wss://ftx.com/ws/", '{"op":"subscribe","channel":"orderbook","market":"BTC-PERP"}'),
# BitMEX
ws_session(session, "wss://ws.bitmex.com/realtime", '{"op":"subscribe","args":["orderBookL2_25:XBTUSD"]}'),
# bitFlyer
ws_session(session, "wss://io.lightstream.bitflyer.com/socket.io/?EIO=3&transport=websocket", '42["subscribe","lightning_board_FX_BTC_JPY"]', specific_ping="2"),
ws_session(session, "wss://ws.lightstream.bitflyer.com/json-rpc", '{"method":"subscribe","params":{"channel":"lightning_board_FX_BTC_JPY"},"id":123}'),
# GMO Coin
ws_session(session, "wss://api.coin.z.com/ws/public/v1", '{"command":"subscribe","channel":"orderbooks","symbol":"BTC_JPY"}'),
# Liquid
ws_session(session, "wss://tap.liquid.com/app/LiquidTapClient", '{"event":"pusher:subscribe","data":{"channel":"price_ladders_cash_btcjpy"}}'),
# bitbank
ws_session(session, "wss://stream.bitbank.cc/socket.io/?EIO=3&transport=websocket", '42["join-room","depth_diff_btc_jpy"]', specific_ping="2"),
# Coincheck
ws_session(session, "wss://ws-api.coincheck.com/", '{"type":"subscribe","channel":"btc_jpy-orderbook"}'),
)
async def start_background_tasks(app):
app['background_task'] = asyncio.create_task(main())
async def cleanup_background_tasks(app):
app['background_task'].cancel()
await app['background_task']
async def get_status(request):
return web.json_response(status)
if __name__ == "__main__":
app = web.Application()
app.add_routes([web.get('/', get_status)])
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)
# try:
# asyncio.run(main())
# except KeyboardInterrupt:
# pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment