Skip to content

Instantly share code, notes, and snippets.

@toransahu
Created May 14, 2021 08:39
Show Gist options
  • Save toransahu/eb89f95873eaf64cde29b21d739abac0 to your computer and use it in GitHub Desktop.
Save toransahu/eb89f95873eaf64cde29b21d739abac0 to your computer and use it in GitHub Desktop.
Mist WebSocket API Client CLI
import json
import os
import time
import websocket # websocket-client>=0.44.0
WS_URL = "wss://api-ws.mist.com/api-ws/v1/stream"
def on_ping(ws, *args, **kwargs):
print('on_ping: %s, %s' % (args, kwargs))
def on_pong(ws, *args, **kwargs):
print('on_pong: %s, %s' % (args, kwargs))
def on_open(ws):
print("onopen: connected to: %s" % WS_URL)
ws.send(json.dumps({"subscribe": "/test"}))
def on_message(ws, message):
print("onmessage: %s" % message)
def on_error(ws, error):
if isinstance(error, KeyboardInterrupt):
raise KeyboardInterrupt() # re-raise to be handled at outer try-catch
print("onerror: %s" % error)
def on_close(ws):
print("onclose: disconnected from %s" % WS_URL)
ws = websocket.WebSocketApp(
WS_URL,
header=[
"Authorization: Token %s" % os.environ["API_TOKEN_MIST"]
],
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_ping=on_ping,
on_pong=on_pong
)
try:
c = 1
while True:
ws.run_forever()
# ws.run_forever(ping_interval=30, ping_timeout=25) # or, try with PING
print("reconnect [%s]" % c)
time.sleep(5)
c += 1
except KeyboardInterrupt:
print('keyboard interrupted. exiting.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment