Skip to content

Instantly share code, notes, and snippets.

@anandanand84
Created August 12, 2020 14:12
Show Gist options
  • Save anandanand84/0918f61670a5f597b210105ef2f97539 to your computer and use it in GitHub Desktop.
Save anandanand84/0918f61670a5f597b210105ef2f97539 to your computer and use it in GitHub Desktop.
import asyncio
import websockets
import hashlib
import hmac
import base64
import time
import json
key = ''
secret = ''
host = 'wss://demo.dvchain.co/websocket'
def get_headers():
milliseconds = int(round(time.time() * 1000))
timewindow=2000
message_bytes = bytes(f'{key}{milliseconds}{timewindow}', 'utf-8')
secret_bytes = bytes(secret, 'utf-8')
hash = hmac.new(secret_bytes, message_bytes, hashlib.sha256)
hash.hexdigest()
signature = base64.b64encode(hash.digest())
return [('dv-api-key',key),('dv-timestamp', milliseconds),('dv-timewindow',timewindow),('dv-signature', signature.decode("utf-8"))]
async def start():
uri = f'{host}'
headers = get_headers()
print(headers)
async with websockets.connect(uri, extra_headers=headers) as websocket:
available_symbol_request={'type' : 'request-response','event': '1', 'topic': 'availablesymbols' }
await websocket.send(json.dumps(available_symbol_request))
available_symbols = await websocket.recv()
print(available_symbols)
asyncio.get_event_loop().run_until_complete(start())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment