Last active
June 10, 2018 12:02
-
-
Save aljp/cb7a85e4f3f1352bbf66165150961892 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Basic SocketIO implementation that waits for orderbook changes from btcmarkets and enqueue's the ask/bid price data. | |
""" | |
import os | |
from decimal import Decimal | |
import re | |
import logging | |
import asyncio | |
import json | |
import websockets | |
import datetime | |
import django | |
import pytz | |
from collections import namedtuple | |
from redis import Redis | |
from rq import Queue | |
from bot.bot import update_positions | |
logging.basicConfig() | |
logger = logging.getLogger(__name__) | |
CURRENCY_ASSET_PAIRS = ( | |
("AUD", "XRP"), | |
# ("AUD", "BTC"), | |
# ("AUD", "LTC"), | |
# ("AUD", "ETH"), | |
# ("BTC", "XRP"), | |
# ("BTC", "LTC"), | |
# ("BTC", "ETH"), | |
) | |
Bid = namedtuple("Bid", ["price", "volume", "magic_code"]) | |
Ask = namedtuple("Ask", ["price", "volume", "magic_code"]) | |
def normalize_price(price) -> Decimal: | |
"""Convert BTC markets prices to decimal dollars/btc""" | |
return Decimal(price * 10 ** -8) | |
def normalize_volume(volume) -> Decimal: | |
"""Convert BTC markets volume to decimal dollars/btc""" | |
return Decimal(volume * 10 ** -8) | |
async def subscribe(websocket: websockets.WebSocketClientProtocol, channel: str): | |
print(f"Subscribed to {channel}") | |
res = await websocket.send(f'42["join", "{channel}"]') | |
return res | |
async def heartbeat(websocket, interval): | |
while True: | |
logger.debug("Sending heartbeat") | |
await websocket.send("2") | |
await asyncio.sleep(interval) | |
async def main(uri: str): | |
redis_url_conf = re.match( | |
r"redis://(?P<host>.+):(?P<port>\d+)(/(?P<db>\d+))?", os.environ["REDIS_URL"] | |
) | |
redis_con = Redis( | |
host=redis_url_conf.group("host"), | |
port=redis_url_conf.group("port"), | |
db=redis_url_conf.group("db") or 0, | |
) | |
# Define the queues we will be using here. | |
queues = { | |
f"btcmarkets_{asset.lower()}_{currency.lower()}": Queue( | |
connection=redis_con, name=f"btcmarkets_{asset.lower()}_{currency.lower()}" | |
) | |
for currency, asset in CURRENCY_ASSET_PAIRS | |
} | |
while True: | |
try: | |
async with websockets.connect(uri) as websocket: # type: websockets.client.WebSocketClientProtocol | |
msg = await websocket.recv() | |
initial_json = json.loads(msg[1:]) | |
sid = initial_json.get("sid") | |
ping_interval = int(initial_json["pingInterval"] / 1000) | |
ping_timeout = int(initial_json.get("pingTimeout", 0) / 1000) or int( | |
ping_interval * 2.5 | |
) | |
try: | |
msg = await asyncio.wait_for(websocket.recv(), timeout=ping_timeout) | |
assert msg == "40", "Failed to connect" | |
except ( | |
AssertionError, | |
asyncio.TimeoutError, | |
websockets.exceptions.ConnectionClosed, | |
) as exc: | |
logger.error(exc) | |
break | |
for currency, asset in CURRENCY_ASSET_PAIRS: | |
await subscribe(websocket, f"Orderbook_{asset}{currency}") | |
hb_future = asyncio.ensure_future(heartbeat(websocket, ping_interval)) | |
while True: | |
try: | |
logger.debug("Awaiting websocket message") | |
msg = await asyncio.wait_for( | |
websocket.recv(), timeout=ping_timeout | |
) | |
logger.debug(msg) | |
logger.debug("websocket received") | |
except ( | |
asyncio.TimeoutError, | |
websockets.exceptions.ConnectionClosed, | |
) as exc: | |
logger.error(exc) | |
try: | |
pong = await websocket.ping() | |
await asyncio.wait_for(pong, timeout=ping_timeout) | |
logger.debug("Ping OK, keeping connection alive") | |
continue | |
except asyncio.TimeoutError: | |
hb_future.cancel() | |
await asyncio.sleep(ping_timeout) | |
break # inner loop | |
if msg[:2] == "42": | |
payload = json.loads(msg[2:]) | |
event_name, event_data = payload[0], payload[1] | |
if event_name == "OrderBookChange": | |
currency = event_data.get("currency") | |
asset = event_data.get("instrument") | |
timestamp = event_data.get("timestamp") | |
market_id = event_data.get("marketId") | |
snapshot_id = event_data.get("snapshotId") | |
asks = list( | |
map( | |
lambda a: ( | |
normalize_price(a[0]), | |
normalize_volume(a[1]), | |
), | |
event_data.get("asks"), | |
) | |
) | |
bids = list( | |
map( | |
lambda a: ( | |
normalize_price(a[0]), | |
normalize_volume(a[1]), | |
), | |
event_data.get("bids"), | |
) | |
) | |
orderbook_timestamp = datetime.datetime.fromtimestamp( | |
timestamp / 1000, tz=pytz.utc | |
) | |
if currency == "AUD" and asset == "XRP": | |
queue = queues[ | |
f"btcmarkets_{asset.lower()}_{currency.lower()}" | |
] | |
queue.enqueue( | |
update_positions, | |
currency, | |
asset, | |
asks, | |
bids, | |
orderbook_timestamp=orderbook_timestamp, | |
) | |
except ConnectionRefusedError as exc: | |
logger.error(exc) | |
if __name__ == "__main__": | |
event_loop = asyncio.get_event_loop() | |
try: | |
event_loop.run_until_complete( | |
main("wss://socket.btcmarkets.net/socket.io/?EIO=3&transport=websocket") | |
) | |
finally: | |
event_loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment