Skip to content

Instantly share code, notes, and snippets.

@aljp
Last active June 10, 2018 12:02
Show Gist options
  • Save aljp/cb7a85e4f3f1352bbf66165150961892 to your computer and use it in GitHub Desktop.
Save aljp/cb7a85e4f3f1352bbf66165150961892 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Basic SocketIO implementation that waits for orderbook changes from btcmarkets and enqueue's the ask/bid price data.
"""
import os
from decimal import Decimal
import re
import logging
import asyncio
import json
import websockets
import datetime
import django
import pytz
from collections import namedtuple
from redis import Redis
from rq import Queue
from bot.bot import update_positions
logging.basicConfig()
logger = logging.getLogger(__name__)
CURRENCY_ASSET_PAIRS = (
("AUD", "XRP"),
# ("AUD", "BTC"),
# ("AUD", "LTC"),
# ("AUD", "ETH"),
# ("BTC", "XRP"),
# ("BTC", "LTC"),
# ("BTC", "ETH"),
)
Bid = namedtuple("Bid", ["price", "volume", "magic_code"])
Ask = namedtuple("Ask", ["price", "volume", "magic_code"])
def normalize_price(price) -> Decimal:
"""Convert BTC markets prices to decimal dollars/btc"""
return Decimal(price * 10 ** -8)
def normalize_volume(volume) -> Decimal:
"""Convert BTC markets volume to decimal dollars/btc"""
return Decimal(volume * 10 ** -8)
async def subscribe(websocket: websockets.WebSocketClientProtocol, channel: str):
print(f"Subscribed to {channel}")
res = await websocket.send(f'42["join", "{channel}"]')
return res
async def heartbeat(websocket, interval):
while True:
logger.debug("Sending heartbeat")
await websocket.send("2")
await asyncio.sleep(interval)
async def main(uri: str):
redis_url_conf = re.match(
r"redis://(?P<host>.+):(?P<port>\d+)(/(?P<db>\d+))?", os.environ["REDIS_URL"]
)
redis_con = Redis(
host=redis_url_conf.group("host"),
port=redis_url_conf.group("port"),
db=redis_url_conf.group("db") or 0,
)
# Define the queues we will be using here.
queues = {
f"btcmarkets_{asset.lower()}_{currency.lower()}": Queue(
connection=redis_con, name=f"btcmarkets_{asset.lower()}_{currency.lower()}"
)
for currency, asset in CURRENCY_ASSET_PAIRS
}
while True:
try:
async with websockets.connect(uri) as websocket: # type: websockets.client.WebSocketClientProtocol
msg = await websocket.recv()
initial_json = json.loads(msg[1:])
sid = initial_json.get("sid")
ping_interval = int(initial_json["pingInterval"] / 1000)
ping_timeout = int(initial_json.get("pingTimeout", 0) / 1000) or int(
ping_interval * 2.5
)
try:
msg = await asyncio.wait_for(websocket.recv(), timeout=ping_timeout)
assert msg == "40", "Failed to connect"
except (
AssertionError,
asyncio.TimeoutError,
websockets.exceptions.ConnectionClosed,
) as exc:
logger.error(exc)
break
for currency, asset in CURRENCY_ASSET_PAIRS:
await subscribe(websocket, f"Orderbook_{asset}{currency}")
hb_future = asyncio.ensure_future(heartbeat(websocket, ping_interval))
while True:
try:
logger.debug("Awaiting websocket message")
msg = await asyncio.wait_for(
websocket.recv(), timeout=ping_timeout
)
logger.debug(msg)
logger.debug("websocket received")
except (
asyncio.TimeoutError,
websockets.exceptions.ConnectionClosed,
) as exc:
logger.error(exc)
try:
pong = await websocket.ping()
await asyncio.wait_for(pong, timeout=ping_timeout)
logger.debug("Ping OK, keeping connection alive")
continue
except asyncio.TimeoutError:
hb_future.cancel()
await asyncio.sleep(ping_timeout)
break # inner loop
if msg[:2] == "42":
payload = json.loads(msg[2:])
event_name, event_data = payload[0], payload[1]
if event_name == "OrderBookChange":
currency = event_data.get("currency")
asset = event_data.get("instrument")
timestamp = event_data.get("timestamp")
market_id = event_data.get("marketId")
snapshot_id = event_data.get("snapshotId")
asks = list(
map(
lambda a: (
normalize_price(a[0]),
normalize_volume(a[1]),
),
event_data.get("asks"),
)
)
bids = list(
map(
lambda a: (
normalize_price(a[0]),
normalize_volume(a[1]),
),
event_data.get("bids"),
)
)
orderbook_timestamp = datetime.datetime.fromtimestamp(
timestamp / 1000, tz=pytz.utc
)
if currency == "AUD" and asset == "XRP":
queue = queues[
f"btcmarkets_{asset.lower()}_{currency.lower()}"
]
queue.enqueue(
update_positions,
currency,
asset,
asks,
bids,
orderbook_timestamp=orderbook_timestamp,
)
except ConnectionRefusedError as exc:
logger.error(exc)
if __name__ == "__main__":
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
main("wss://socket.btcmarkets.net/socket.io/?EIO=3&transport=websocket")
)
finally:
event_loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment