Skip to content

Instantly share code, notes, and snippets.

Created February 1, 2018 19:44
Show Gist options
  • Save alexbrillant/961502146a7fc5d03205f9b07b8535f5 to your computer and use it in GitHub Desktop.
Save alexbrillant/961502146a7fc5d03205f9b07b8535f5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# coding=utf-8
import json
import threading
from autobahn.twisted.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
from twisted.internet import reactor, ssl
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.error import ReactorAlreadyRunning
from binance.client import Client
class BinanceClientProtocol(WebSocketClientProtocol):
def onConnect(self, response):
# reset the delay after reconnecting
def onMessage(self, payload, isBinary):
if not isBinary:
payload_obj = json.loads(payload.decode('utf8'))
except ValueError:
class BinanceReconnectingClientFactory(ReconnectingClientFactory):
# set initial delay to a short time
initialDelay = 0.1
maxDelay = 10
maxRetries = 5
class BinanceClientFactory(WebSocketClientFactory, BinanceReconnectingClientFactory):
protocol = BinanceClientProtocol
_reconnect_error_payload = {
'e': 'error',
'm': 'Max reconnect retries reached'
def clientConnectionFailed(self, connector, reason):
if self.retries > self.maxRetries:
def clientConnectionLost(self, connector, reason):
if self.retries > self.maxRetries:
class BinanceSocketManager(threading.Thread):
STREAM_URL = 'wss://'
_user_timeout = 30 * 60 # 30 minutes
def __init__(self, client):
"""Initialise the BinanceSocketManager
:param client: Binance API client
:type client: binance.Client
self._conns = {}
self._user_timer = None
self._user_listen_key = None
self._user_callback = None
self._client = client
def _start_socket(self, path, callback, prefix='ws/'):
if path in self._conns:
return False
factory_url = self.STREAM_URL + prefix + path
factory = BinanceClientFactory(factory_url)
factory.protocol = BinanceClientProtocol
factory.callback = callback
factory.reconnect = True
context_factory = ssl.ClientContextFactory()
self._conns[path] = connectWS(factory, context_factory)
return path
def start_depth_socket(self, symbol, callback, depth=None):
"""Start a websocket for symbol market depth returning either a diff or a partial book
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:param depth: optional Number of depth entries to return, default None. If passed returns a partial book instead of a diff
:type depth: enum
:returns: connection key string if successful, False otherwise
Partial Message Format
.. code-block:: python
"lastUpdateId": 160, # Last update ID
"bids": [ # Bids to be updated
"0.0024", # price level to be updated
"10", # quantity
[] # ignore
"asks": [ # Asks to be updated
"0.0026", # price level to be updated
"100", # quantity
[] # ignore
Diff Message Format
.. code-block:: python
"e": "depthUpdate", # Event type
"E": 123456789, # Event time
"s": "BNBBTC", # Symbol
"U": 157, # First update ID in event
"u": 160, # Final update ID in event
"b": [ # Bids to be updated
"0.0024", # price level to be updated
"10", # quantity
[] # ignore
"a": [ # Asks to be updated
"0.0026", # price level to be updated
"100", # quantity
[] # ignore
socket_name = symbol.lower() + '@depth'
if depth and depth != '1':
socket_name = '{}{}'.format(socket_name, depth)
return self._start_socket(socket_name, callback)
def start_kline_socket(self, symbol, callback, interval=Client.KLINE_INTERVAL_1MINUTE):
"""Start a websocket for symbol kline data
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:param interval: Kline interval, default KLINE_INTERVAL_1MINUTE
:type interval: enum
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
"e": "kline", # event type
"E": 1499404907056, # event time
"s": "ETHBTC", # symbol
"k": {
"t": 1499404860000, # start time of this bar
"T": 1499404919999, # end time of this bar
"s": "ETHBTC", # symbol
"i": "1m", # interval
"f": 77462, # first trade id
"L": 77465, # last trade id
"o": "0.10278577", # open
"c": "0.10278645", # close
"h": "0.10278712", # high
"l": "0.10278518", # low
"v": "17.47929838", # volume
"n": 4, # number of trades
"x": false, # whether this bar is final
"q": "1.79662878", # quote volume
"V": "2.34879839", # volume of active buy
"Q": "0.24142166", # quote volume of active buy
"B": "13279784.01349473" # can be ignored
socket_name = '{}@kline_{}'.format(symbol.lower(), interval)
return self._start_socket(socket_name, callback)
def start_trade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
"e": "trade", # Event type
"E": 123456789, # Event time
"s": "BNBBTC", # Symbol
"t": 12345, # Trade ID
"p": "0.001", # Price
"q": "100", # Quantity
"b": 88, # Buyer order Id
"a": 50, # Seller order Id
"T": 123456785, # Trade time
"m": true, # Is the buyer the market maker?
"M": true # Ignore.
return self._start_socket(symbol.lower() + '@trade', callback)
def start_aggtrade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
"e": "aggTrade", # event type
"E": 1499405254326, # event time
"s": "ETHBTC", # symbol
"a": 70232, # aggregated tradeid
"p": "0.10281118", # price
"q": "8.15632997", # quantity
"f": 77489, # first breakdown trade id
"l": 77489, # last breakdown trade id
"T": 1499405254324, # trade time
"m": false, # whether buyer is a maker
"M": true # can be ignored
return self._start_socket(symbol.lower() + '@aggTrade', callback)
def start_symbol_ticker_socket(self, symbol, callback):
"""Start a websocket for a symbol's ticker data
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
"e": "24hrTicker", # Event type
"E": 123456789, # Event time
"s": "BNBBTC", # Symbol
"p": "0.0015", # Price change
"P": "250.00", # Price change percent
"w": "0.0018", # Weighted average price
"x": "0.0009", # Previous day's close price
"c": "0.0025", # Current day's close price
"Q": "10", # Close trade's quantity
"b": "0.0024", # Best bid price
"B": "10", # Bid bid quantity
"a": "0.0026", # Best ask price
"A": "100", # Best ask quantity
"o": "0.0010", # Open price
"h": "0.0025", # High price
"l": "0.0010", # Low price
"v": "10000", # Total traded base asset volume
"q": "18", # Total traded quote asset volume
"O": 0, # Statistics open time
"C": 86400000, # Statistics close time
"F": 0, # First trade ID
"L": 18150, # Last trade Id
"n": 18151 # Total number of trades
return self._start_socket(symbol.lower() + '@ticker', callback)
def start_ticker_socket(self, callback):
"""Start a websocket for all ticker data
By default all markets are included in an array.
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
'F': 278610,
'o': '0.07393000',
's': 'BCCBTC',
'C': 1509622420916,
'b': '0.07800800',
'l': '0.07160300',
'h': '0.08199900',
'L': 287722,
'P': '6.694',
'Q': '0.10000000',
'q': '1202.67106335',
'p': '0.00494900',
'O': 1509536020916,
'a': '0.07887800',
'n': 9113,
'B': '1.00000000',
'c': '0.07887900',
'x': '0.07399600',
'w': '0.07639068',
'A': '2.41900000',
'v': '15743.68900000'
return self._start_socket('!ticker@arr', callback)
def start_multiplex_socket(self, streams, callback):
"""Start a multiplexed socket using a list of socket names.
User stream sockets can not be included.
Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker
Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>}
:param streams: list of stream names in lower case
:type streams: list
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
stream_path = 'streams={}'.format('/'.join(streams))
return self._start_socket(stream_path, callback, 'stream?')
def start_user_socket(self, callback):
"""Start a websocket for user data
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
if self._user_listen_key:
# cleanup any sockets with this key
for conn_key in self._conns:
if len(conn_key) >= 60 and conn_key[:60] == self._user_listen_key:
self._user_listen_key = self._client.stream_get_listen_key()
self._user_callback = callback
conn_key = self._start_socket(self._user_listen_key, callback)
if conn_key:
# start timer to keep socket alive
return conn_key
def _start_user_timer(self):
self._user_timer = threading.Timer(self._user_timeout, self._keepalive_user_socket)
def _keepalive_user_socket(self):
listen_key = self._client.stream_get_listen_key()
# check if they key changed and
if listen_key != self._user_listen_key:
def stop_socket(self, conn_key):
"""Stop a websocket given the connection key
:param conn_key: Socket connection key
:type conn_key: string
:returns: connection key string if successful, False otherwise
if conn_key not in self._conns:
# disable reconnecting if we are closing
self._conns[conn_key].factory = WebSocketClientFactory(self.STREAM_URL + 'tmp_path')
# check if we have a user stream socket
if len(conn_key) >= 60 and conn_key[:60] == self._user_listen_key:
def _stop_user_socket(self):
if not self._user_listen_key:
# stop the timer
self._user_timer = None
# close the stream
self._user_listen_key = None
def run(self):
except ReactorAlreadyRunning:
# Ignore error about reactor already running
def close(self):
"""Close all connections
keys = set(self._conns.keys())
for key in keys:
self._conns = {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment