Last active
January 14, 2024 22:07
-
-
Save vorandrew/72d2e690df548e93a37cd28e87c4639d to your computer and use it in GitHub Desktop.
tastytrade.com and DXFeed - stock quotes and OHLCs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import datetime as dt | |
import json | |
import logging | |
import re | |
import requests | |
import websockets | |
URL = "https://api.tastytrade.com" | |
log = logging.getLogger(__name__) | |
class DXFeed(): | |
M = 'm' | |
M5 = '5m' | |
M15 = '15m' | |
M30 = '30m' | |
H = 'h' | |
H4 = '4h' | |
D = 'd' | |
def __init__(self, token) -> None: | |
""" | |
Init DXFeed | |
""" | |
self.token = token | |
self._prices = {} | |
self._ohlc = {} | |
async def _send(self, msg): | |
await self.ws.send(json.dumps(msg)) | |
async def _recv(self): | |
msg = await self.ws.recv() | |
return json.loads(msg) | |
async def close(self): | |
self._task_heartbeat.cancel() | |
await self.ws.close() | |
async def init(self): | |
dx_token, uri = self._fetch_token() | |
self.ws = await websockets.connect(uri) | |
await self._send({ | |
"type": "SETUP", | |
"channel": 0, | |
"keepaliveTimeout": 60, | |
"acceptKeepaliveTimeout": 60, | |
"version": "0.1-js/1.0.0-beta.3" | |
}) | |
await self._recv() | |
await self._recv() | |
await self._send({ | |
"type": "AUTH", | |
"channel": 0, | |
"token": dx_token, | |
}) | |
res = await self._recv() | |
if res["state"] != "AUTHORIZED": | |
raise Exception(f"Error auth: {res}") | |
await self._send({ | |
"type": "CHANNEL_REQUEST", | |
"channel": 1, | |
"service": "FEED", | |
"parameters": {"contract": "AUTO"} | |
}) | |
res = await self._recv() | |
if res["type"] != "CHANNEL_OPENED": | |
raise Exception(f"Error channel: {res}") | |
await self._send({ | |
"channel": 1, | |
"type": "FEED_SETUP", | |
"acceptAggregationPeriod": 1, | |
"acceptDataFormat": "COMPACT", | |
"acceptEventFields": { | |
"Quote": [ | |
"eventSymbol", | |
"bidPrice", | |
"askPrice" | |
] | |
} | |
}) | |
await self._recv() | |
await self._send({ | |
"type": "CHANNEL_REQUEST", | |
"channel": 3, | |
"service": "FEED", | |
"parameters": {"contract": "AUTO"} | |
}) | |
res = await self._recv() | |
if res["type"] != "CHANNEL_OPENED": | |
raise Exception(f"Error channel: {res}") | |
await self._send({ | |
"channel": 3, | |
"type": "FEED_SETUP", | |
"acceptAggregationPeriod": 1, | |
"acceptDataFormat": "COMPACT", | |
"acceptEventFields": { | |
"Candle": [ | |
"eventType", | |
"eventSymbol", | |
"time", | |
"open", | |
"high", | |
"low", | |
"close", | |
] | |
} | |
}) | |
res = await self._recv() | |
if res["type"] != "FEED_CONFIG": | |
raise Exception(f"Error channel: {res}") | |
self._task_heartbeat = asyncio.create_task(self._heartbeat()) | |
async def loop(self): | |
async for msg in self.ws: | |
await self._process(msg) | |
async def _process(self, raw): | |
log.debug(f"Received: {raw}") | |
msg = json.loads(raw) | |
if msg["type"] == "FEED_DATA": | |
tmp = msg["data"] | |
data = tmp[1] | |
if tmp[0] == "Quote": | |
while len(data) > 0: | |
symbol, bid, ask = data[0:3] | |
data = data[3:] | |
self._prices[symbol] = { | |
"bid": float(bid), | |
"ask": float(ask), | |
} | |
log.debug(f"Quote: {symbol} {bid} {ask}") | |
if tmp[0] == "Candle": | |
while len(data) > 0: | |
_, symbol, t, o, h, l, c = data[0:7] | |
data = data[7:] | |
if o == 'NaN': | |
continue | |
match = re.match(r"([A-Z]+)\{=([^\}]+)\}", symbol) | |
if match is None: | |
log.error(f"Invalid symbol: {symbol}") | |
continue | |
symbol = match.group(1) | |
tf = match.group(2) | |
bar = { | |
"time": int(t / 1000), | |
"open": float(o), | |
"high": float(h), | |
"low": float(l), | |
"close": float(c), | |
} | |
if symbol not in self._ohlc: | |
self._ohlc[symbol] = {} | |
if tf not in self._ohlc[symbol]: | |
self._ohlc[symbol][tf] = {} | |
if bar["time"] not in self._ohlc[symbol][tf]: | |
self._ohlc[symbol][tf][bar["time"]] = bar | |
log.debug(f"OHLC: {bar} {symbol} {tf}") | |
async def _heartbeat(self): | |
while True: | |
await asyncio.sleep(59) | |
await self._send({ | |
"type": "KEEPALIVE", | |
"channel": 0, | |
}) | |
log.debug("Sent heartbeat") | |
async def _ticker_subscribe(self, symbol: str): | |
await self._send({ | |
"channel": 1, | |
"type": "FEED_SUBSCRIPTION", | |
"add": [{"type": "Quote", "symbol": symbol}] | |
}) | |
async def _ticker_remove(self, symbol): | |
await self._send({ | |
"channel": 1, | |
"type": "FEED_SUBSCRIPTION", | |
"remove": [{"type": "Quote", "symbol": symbol}] | |
}) | |
async def _ohlc_subscribe(self, symbol, tf=M15, days=1, from_time: dt.datetime | None = None): | |
from_time = dt.datetime.now() - dt.timedelta(days=days) if from_time is None else from_time | |
return await self._send({ | |
"channel": 3, | |
"type": "FEED_SUBSCRIPTION", | |
"add": [ | |
{ | |
"type": "Candle", | |
"symbol": "{}{{={}}}".format(symbol, tf), | |
"fromTime": int(from_time.timestamp() * 1000), | |
} | |
] | |
}) | |
async def _ohlc_remove(self, symbol, tf=M15): | |
return await self._send({ | |
"channel": 3, | |
"type": "FEED_SUBSCRIPTION", | |
"remove": [ | |
{ | |
"type": "Candle", | |
"symbol": "{}{{={}}}".format(symbol, tf), | |
} | |
] | |
}) | |
def _get_ohlc(self, symbol, tf=M15, days=1, from_time: dt.datetime | None = None) -> list[dict]: | |
from_time = dt.datetime.now() - dt.timedelta(days=days) if from_time is None else from_time | |
from_time_int = int(from_time.timestamp()) | |
timestamps = [] | |
for t, bar in self._ohlc[symbol][tf].items(): | |
if t >= from_time_int: | |
timestamps.append(t) | |
if len(timestamps) == 0: | |
return [] | |
sorted(timestamps, reverse=True) | |
bars = [] | |
for t in timestamps: | |
bar = self._ohlc[symbol][tf][t] | |
bars.append({**bar, "symbol": symbol, "tf": tf}) | |
return bars | |
async def ohlc(self, symbol, tf=M15, days=1, from_time: dt.datetime | None = None) -> list[dict]: | |
await self._ohlc_subscribe(symbol, tf, days, from_time) | |
for _ in range(10): | |
try: | |
bars = self._get_ohlc(symbol, tf, days, from_time) | |
return bars | |
except KeyError: | |
await asyncio.sleep(0.1) | |
raise Exception("Timeout waiting for quote") | |
async def quote(self, symbol) -> dict: | |
if symbol in self._prices: | |
return self._prices[symbol] | |
await self._ticker_subscribe(symbol) | |
for _ in range(10): | |
if symbol in self._prices: | |
return self._prices[symbol] | |
await asyncio.sleep(0.1) | |
raise Exception("Timeout waiting for quote") | |
def _fetch_token(self): | |
"""Fetches DXFeed token""" | |
try: | |
req = requests.get( | |
f"{URL}/api-quote-tokens", | |
headers={ | |
"Content-Type": "application/json", | |
"Authorization": self.token, | |
} | |
) | |
if req.status_code != 200: | |
raise Exception(f"Error token: {req.text}") | |
res = req.json() | |
token = res.get("data").get("token") | |
url = res.get("data").get("dxlink-url") | |
except requests.exceptions.HTTPError as err: | |
raise Exception(f"Error getting token: {err}") | |
return token, url |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import datetime as dt | |
import os | |
import pytest | |
from dotenv import load_dotenv | |
from dxfeed import DXFeed | |
load_dotenv() | |
token = os.getenv('TASTY_TOKEN') | |
@pytest.mark.asyncio | |
async def test_dxfeed(): | |
dx = DXFeed(token) | |
await dx.init() | |
await dx.close() | |
@pytest.mark.asyncio | |
async def test_quote(): | |
dx = DXFeed(token) | |
await dx.init() | |
loop = asyncio.create_task(dx.loop()) | |
quote = await dx.quote('AAPL') | |
assert quote['bid'] > 0 | |
assert quote['ask'] > 0 | |
await dx.close() | |
loop.cancel() | |
@pytest.mark.asyncio | |
async def test_ohlc(): | |
dx = DXFeed(token) | |
symbol = 'AAPL' | |
await dx.init() | |
loop = asyncio.create_task(dx.loop()) | |
ohlc = await dx.ohlc(symbol, DXFeed.H4, days=3) | |
await dx.close() | |
loop.cancel() | |
assert len(ohlc) > 0 | |
assert ohlc[0]['open'] > 0 | |
assert ohlc[0]['high'] > 0 | |
assert ohlc[0]['low'] > 0 | |
assert ohlc[0]['close'] > 0 | |
assert ohlc[0]['time'] < int(dt.datetime.now().timestamp()) | |
assert ohlc[0]['symbol'] == symbol |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment