Skip to content

Instantly share code, notes, and snippets.

@melardev
Last active May 29, 2023 09:23
Show Gist options
  • Save melardev/d6c09d139a47f117d060596f9f6de7bb to your computer and use it in GitHub Desktop.
Save melardev/d6c09d139a47f117d060596f9f6de7bb to your computer and use it in GitHub Desktop.
import asyncio
import datetime
import json
import sys
import time
import pytz
from autobahn.asyncio.websocket import WebSocketClientFactory, WebSocketClientProtocol
if len(sys.argv) > 1 and sys.argv[1].lower() == 'spot':
# Listen in spot market
api_endpoint = 'https://api.binance.com/api/v3/aggTrades' # spot
ws_endpoint = f'wss://stream.binance.com/ws'
ws_host = 'stream.binance.com'
print('Using Spot')
else:
# Listen in futures market
api_endpoint = 'https://fapi.binance.com/fapi/v1/aggTrades' # futures
ws_endpoint = f'wss://fstream.binance.com/ws'
ws_host = 'fstream.binance.com'
print('Using futures')
last_received = time.time()
running = True
last_trades_statistics_time = time.time()
class MyClientProtocol(WebSocketClientProtocol):
def __init__(self):
super().__init__()
self.tick = 0
self.cvd = 0
def onConnect(self, response):
print("Server connected: {0}".format(response.peer))
def onConnecting(self, transport_details):
# print("Connecting; transport details: {}".format(transport_details))
return None # ask for defaults
def onOpen(self):
print("WebSocket connection open.")
message = json.dumps({
"method": "SUBSCRIBE",
"params": [
'tomousdt@aggTrade',
],
"id": 1,
}).encode()
self.sendMessage(message)
def onMessage(self, payload, isBinary):
global last_received
trade = json.loads(payload)
now = time.time()
last_received = now
global last_trades_statistics_time
if 'e' not in trade:
print(f'Invalid Trade - {trade}')
return
trade_timestamp = trade['T']
trade_time = datetime.datetime.fromtimestamp(trade['T'] / 1000, tz=pytz.UTC)
trade['datetime'] = trade_time
trade['timestamp'] = trade_timestamp
trade['base_quantity'] = float(trade['q'])
trade['nominal_size'] = float(trade['p']) * float(trade['q'])
is_sell = trade['m']
big_str = 'big' if trade['nominal_size'] >= 7_000 else ''
if is_sell:
side = 'sell'
self.cvd -= trade['nominal_size']
else:
side = 'buy'
self.cvd += trade['nominal_size']
print(
f'- {trade_time} - {big_str} {side} Trade: {format(round(trade["nominal_size"], 2), ",")}$, '
f'{trade["base_quantity"]} '
f'- CVD: {format(round(self.cvd, 2), ",")}$')
return
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
def run():
factory = WebSocketClientFactory(ws_endpoint)
factory.protocol = lambda: MyClientProtocol()
loop = asyncio.get_event_loop()
coro = loop.create_connection(factory, ws_host, 443, ssl=True)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment