Skip to content

Instantly share code, notes, and snippets.

@marif4444
Created March 5, 2019 06:16
Show Gist options
  • Save marif4444/ca2e9fcccd49dd8f371a5d67bddc466c to your computer and use it in GitHub Desktop.
Save marif4444/ca2e9fcccd49dd8f371a5d67bddc466c to your computer and use it in GitHub Desktop.
Delta websocket connection
import websocket
import hashlib
import hmac
import base64
import datetime
import json
import time
import signal
from threading import Thread
def generate_signature(secret, message):
message = bytes(message, 'utf-8')
secret = bytes(secret, 'utf-8')
hash = hmac.new(secret, message, hashlib.sha256)
return hash.hexdigest()
def get_time_stamp():
d = datetime.datetime.utcnow()
epoch = datetime.datetime(1970, 1, 1)
ts = str(int((d - epoch).total_seconds()))
return ts
def on_open(ws):
print("on_open")
# ws.send(output_json)
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws):
print("close")
conn_timeout = 5
ws = websocket.WebSocketApp('wss://api.delta.exchange:2096',
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
wst = Thread(name='Delta Websocket',
target=lambda: ws.run_forever())
wst.daemon = True
wst.start()
while not ws.sock or not ws.sock.connected and conn_timeout:
time.sleep(1)
conn_timeout -= 1
if not conn_timeout:
print(
"Couldn't establish connetion with Delta websocket")
else:
print('Connected')
ws.send(json.dumps({
"type": "subscribe",
"payload": {
"channels": [
{
"name": 'l2_orderbook',
"symbols": ['BTCUSD_29Mar']
}
]
}
}))
while True: # In order to keep the main thread running
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment