Skip to content

Instantly share code, notes, and snippets.

@vorandrew
Last active January 14, 2024 22:07
Show Gist options
  • Save vorandrew/72d2e690df548e93a37cd28e87c4639d to your computer and use it in GitHub Desktop.
Save vorandrew/72d2e690df548e93a37cd28e87c4639d to your computer and use it in GitHub Desktop.
tastytrade.com and DXFeed - stock quotes and OHLCs
import asyncio
import datetime as dt
import json
import logging
import re
import requests
import websockets
URL = "https://api.tastytrade.com"
log = logging.getLogger(__name__)
class DXFeed():
M = 'm'
M5 = '5m'
M15 = '15m'
M30 = '30m'
H = 'h'
H4 = '4h'
D = 'd'
def __init__(self, token) -> None:
"""
Init DXFeed
"""
self.token = token
self._prices = {}
self._ohlc = {}
async def _send(self, msg):
await self.ws.send(json.dumps(msg))
async def _recv(self):
msg = await self.ws.recv()
return json.loads(msg)
async def close(self):
self._task_heartbeat.cancel()
await self.ws.close()
async def init(self):
dx_token, uri = self._fetch_token()
self.ws = await websockets.connect(uri)
await self._send({
"type": "SETUP",
"channel": 0,
"keepaliveTimeout": 60,
"acceptKeepaliveTimeout": 60,
"version": "0.1-js/1.0.0-beta.3"
})
await self._recv()
await self._recv()
await self._send({
"type": "AUTH",
"channel": 0,
"token": dx_token,
})
res = await self._recv()
if res["state"] != "AUTHORIZED":
raise Exception(f"Error auth: {res}")
await self._send({
"type": "CHANNEL_REQUEST",
"channel": 1,
"service": "FEED",
"parameters": {"contract": "AUTO"}
})
res = await self._recv()
if res["type"] != "CHANNEL_OPENED":
raise Exception(f"Error channel: {res}")
await self._send({
"channel": 1,
"type": "FEED_SETUP",
"acceptAggregationPeriod": 1,
"acceptDataFormat": "COMPACT",
"acceptEventFields": {
"Quote": [
"eventSymbol",
"bidPrice",
"askPrice"
]
}
})
await self._recv()
await self._send({
"type": "CHANNEL_REQUEST",
"channel": 3,
"service": "FEED",
"parameters": {"contract": "AUTO"}
})
res = await self._recv()
if res["type"] != "CHANNEL_OPENED":
raise Exception(f"Error channel: {res}")
await self._send({
"channel": 3,
"type": "FEED_SETUP",
"acceptAggregationPeriod": 1,
"acceptDataFormat": "COMPACT",
"acceptEventFields": {
"Candle": [
"eventType",
"eventSymbol",
"time",
"open",
"high",
"low",
"close",
]
}
})
res = await self._recv()
if res["type"] != "FEED_CONFIG":
raise Exception(f"Error channel: {res}")
self._task_heartbeat = asyncio.create_task(self._heartbeat())
async def loop(self):
async for msg in self.ws:
await self._process(msg)
async def _process(self, raw):
log.debug(f"Received: {raw}")
msg = json.loads(raw)
if msg["type"] == "FEED_DATA":
tmp = msg["data"]
data = tmp[1]
if tmp[0] == "Quote":
while len(data) > 0:
symbol, bid, ask = data[0:3]
data = data[3:]
self._prices[symbol] = {
"bid": float(bid),
"ask": float(ask),
}
log.debug(f"Quote: {symbol} {bid} {ask}")
if tmp[0] == "Candle":
while len(data) > 0:
_, symbol, t, o, h, l, c = data[0:7]
data = data[7:]
if o == 'NaN':
continue
match = re.match(r"([A-Z]+)\{=([^\}]+)\}", symbol)
if match is None:
log.error(f"Invalid symbol: {symbol}")
continue
symbol = match.group(1)
tf = match.group(2)
bar = {
"time": int(t / 1000),
"open": float(o),
"high": float(h),
"low": float(l),
"close": float(c),
}
if symbol not in self._ohlc:
self._ohlc[symbol] = {}
if tf not in self._ohlc[symbol]:
self._ohlc[symbol][tf] = {}
if bar["time"] not in self._ohlc[symbol][tf]:
self._ohlc[symbol][tf][bar["time"]] = bar
log.debug(f"OHLC: {bar} {symbol} {tf}")
async def _heartbeat(self):
while True:
await asyncio.sleep(59)
await self._send({
"type": "KEEPALIVE",
"channel": 0,
})
log.debug("Sent heartbeat")
async def _ticker_subscribe(self, symbol: str):
await self._send({
"channel": 1,
"type": "FEED_SUBSCRIPTION",
"add": [{"type": "Quote", "symbol": symbol}]
})
async def _ticker_remove(self, symbol):
await self._send({
"channel": 1,
"type": "FEED_SUBSCRIPTION",
"remove": [{"type": "Quote", "symbol": symbol}]
})
async def _ohlc_subscribe(self, symbol, tf=M15, days=1, from_time: dt.datetime | None = None):
from_time = dt.datetime.now() - dt.timedelta(days=days) if from_time is None else from_time
return await self._send({
"channel": 3,
"type": "FEED_SUBSCRIPTION",
"add": [
{
"type": "Candle",
"symbol": "{}{{={}}}".format(symbol, tf),
"fromTime": int(from_time.timestamp() * 1000),
}
]
})
async def _ohlc_remove(self, symbol, tf=M15):
return await self._send({
"channel": 3,
"type": "FEED_SUBSCRIPTION",
"remove": [
{
"type": "Candle",
"symbol": "{}{{={}}}".format(symbol, tf),
}
]
})
def _get_ohlc(self, symbol, tf=M15, days=1, from_time: dt.datetime | None = None) -> list[dict]:
from_time = dt.datetime.now() - dt.timedelta(days=days) if from_time is None else from_time
from_time_int = int(from_time.timestamp())
timestamps = []
for t, bar in self._ohlc[symbol][tf].items():
if t >= from_time_int:
timestamps.append(t)
if len(timestamps) == 0:
return []
sorted(timestamps, reverse=True)
bars = []
for t in timestamps:
bar = self._ohlc[symbol][tf][t]
bars.append({**bar, "symbol": symbol, "tf": tf})
return bars
async def ohlc(self, symbol, tf=M15, days=1, from_time: dt.datetime | None = None) -> list[dict]:
await self._ohlc_subscribe(symbol, tf, days, from_time)
for _ in range(10):
try:
bars = self._get_ohlc(symbol, tf, days, from_time)
return bars
except KeyError:
await asyncio.sleep(0.1)
raise Exception("Timeout waiting for quote")
async def quote(self, symbol) -> dict:
if symbol in self._prices:
return self._prices[symbol]
await self._ticker_subscribe(symbol)
for _ in range(10):
if symbol in self._prices:
return self._prices[symbol]
await asyncio.sleep(0.1)
raise Exception("Timeout waiting for quote")
def _fetch_token(self):
"""Fetches DXFeed token"""
try:
req = requests.get(
f"{URL}/api-quote-tokens",
headers={
"Content-Type": "application/json",
"Authorization": self.token,
}
)
if req.status_code != 200:
raise Exception(f"Error token: {req.text}")
res = req.json()
token = res.get("data").get("token")
url = res.get("data").get("dxlink-url")
except requests.exceptions.HTTPError as err:
raise Exception(f"Error getting token: {err}")
return token, url
import asyncio
import datetime as dt
import os
import pytest
from dotenv import load_dotenv
from dxfeed import DXFeed
load_dotenv()
token = os.getenv('TASTY_TOKEN')
@pytest.mark.asyncio
async def test_dxfeed():
dx = DXFeed(token)
await dx.init()
await dx.close()
@pytest.mark.asyncio
async def test_quote():
dx = DXFeed(token)
await dx.init()
loop = asyncio.create_task(dx.loop())
quote = await dx.quote('AAPL')
assert quote['bid'] > 0
assert quote['ask'] > 0
await dx.close()
loop.cancel()
@pytest.mark.asyncio
async def test_ohlc():
dx = DXFeed(token)
symbol = 'AAPL'
await dx.init()
loop = asyncio.create_task(dx.loop())
ohlc = await dx.ohlc(symbol, DXFeed.H4, days=3)
await dx.close()
loop.cancel()
assert len(ohlc) > 0
assert ohlc[0]['open'] > 0
assert ohlc[0]['high'] > 0
assert ohlc[0]['low'] > 0
assert ohlc[0]['close'] > 0
assert ohlc[0]['time'] < int(dt.datetime.now().timestamp())
assert ohlc[0]['symbol'] == symbol
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment