Skip to content

Instantly share code, notes, and snippets.

@craigderington
Created April 23, 2022 16:34
Show Gist options
  • Save craigderington/5d06d9d66f69c4890da35282d3c06855 to your computer and use it in GitHub Desktop.
Save craigderington/5d06d9d66f69c4890da35282d3c06855 to your computer and use it in GitHub Desktop.
from signalr_aio import Connection
from base64 import b64decode
from zlib import decompress, MAX_WBITS
import json
# decode the payload
def process_message(message):
deflated_msg = decompress(b64decode(message), -MAX_WBITS)
return json.loads(deflated_msg.decode())
# debug message handler.
async def on_debug(**msg):
# In case of 'queryExchangeState'
if 'R' in msg and type(msg['R']) is not bool:
decoded_msg = process_message(msg['R'])
print(decoded_msg)
# error handler
async def on_error(msg):
print(msg)
# hub message handler
async def on_message(msg):
decoded_msg = process_message(msg[0])
print(decoded_msg)
if __name__ == "__main__":
# create connection
connection = Connection('https://pro.coinbase.com/signalr', session=None)
# register the hub
hub = connection.register_hub('c2')
# register debug handler
connection.received += on_debug
# register error handler
connection.error += on_error
# hub message handler
hub.client.on('', on_message)
hub.client.on('', on_message)
# subscribe to topics
hub.server.invoke('SubscribeToExchangeDeltas', 'BTC-USD')
hub.server.invoke('SubscribeToSummaryDeltas')
hub.server.invoke('queryExchangeState', 'BTC-USDC')
# Start the client
connection.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment