Skip to content

Instantly share code, notes, and snippets.

@dgnsrekt
Created June 5, 2020 01:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgnsrekt/2a373ba7a69b98c23cf385ff6f363ae0 to your computer and use it in GitHub Desktop.
Save dgnsrekt/2a373ba7a69b98c23cf385ff6f363ae0 to your computer and use it in GitHub Desktop.
Playing around with tdameritrade stream api.
import pandas
from decouple import config
from time import sleep
DATABASE_URI = config("DATABASE_URI")
while True:
table = pandas.read_sql_table("chartdata", DATABASE_URI)
columns = ["CHART_TIME", "OPEN_PRICE", "HIGH_PRICE", "LOW_PRICE", "CLOSE_PRICE", "VOLUME"]
es = table[table["key"] == "/GC"][columns]
es = es.set_index("CHART_TIME")
rsamp = es.resample("1H").agg(
{
"OPEN_PRICE": "first",
"HIGH_PRICE": "max",
"LOW_PRICE": "min",
"CLOSE_PRICE": "last",
"VOLUME": "sum",
}
)
print(rsamp)
sleep(30)
from tda import auth
from tda.client import Client
from tda.streaming import StreamClient
from paths import PROJECT_ROOT, LOCAL_CHROMEDRIVER_LOCATION
from decouple import config
import asyncio
import json
TOKEN_PATH = PROJECT_ROOT / "token.pickle"
TD_AMERITRADE_API_KEY = config("TD_AMERITRADE_API_KEY")
TD_AMERITRADE_REDIRECT_URI = config("TD_AMERITRADE_REDIRECT_URI")
TD_AMERITRADE_ACCOUNT_NUMBER = config("TD_AMERITRADE_ACCOUNT_NUMBER")
CHROMEDRIVER_LOCATION = config("CHROMEDRIVER_LOCATION", LOCAL_CHROMEDRIVER_LOCATION)
DATABASE_URI = config("DATABASE_URI")
try:
api = auth.client_from_token_file(TOKEN_PATH, TD_AMERITRADE_API_KEY)
except FileNotFoundError:
from selenium import webdriver
with webdriver.Chrome(CHROMEDRIVER_LOCATION) as driver:
api = auth.client_from_login_flow(
driver, TD_AMERITRADE_API_KEY, TD_AMERITRADE_REDIRECT_URI, TOKEN_PATH
)
stream_client = StreamClient(api, account_id=TD_AMERITRADE_ACCOUNT_NUMBER)
SYMBOLS = ["/MES", "/ES", "/GC", "/NQ", "/RTY", "/EMD", "/YM", "/NKD", "/VX", "/BTC"]
ENERGY = ["/QG", "/RB", "/HO", "/CL", "/NG", "/QM", "/BZ"]
from pydantic import BaseModel, validate_arguments
from decimal import Decimal
from datetime import datetime
from typing import List
from sqlalchemy import Column, Integer, BigInteger, String, DateTime, Numeric, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class ChartData(Base):
__tablename__ = "chartdata"
id = Column(Integer, index=True, primary_key=True, autoincrement=True)
seq = Column(BigInteger, nullable=False)
key = Column(String(length=16), nullable=False)
CHART_TIME = Column(DateTime)
OPEN_PRICE = Column(Numeric)
HIGH_PRICE = Column(Numeric)
LOW_PRICE = Column(Numeric)
CLOSE_PRICE = Column(Numeric)
VOLUME = Column(Numeric)
__table_args__ = (
UniqueConstraint("CHART_TIME", "key"),
) # NOTE: May remove constraint to use timescaledb.
class ChartTick(BaseModel):
seq: int
key: str # "/RB",
CHART_TIME: datetime # 1591148220000,
OPEN_PRICE: Decimal # 1.1257000000000001,
HIGH_PRICE: Decimal # 1.1257000000000001,
LOW_PRICE: Decimal # 1.1252,
CLOSE_PRICE: Decimal # 1.1252,
VOLUME: Decimal # 2.0
class ChartFuturesResponse(BaseModel):
service: str
timestamp: datetime
command: str
content: List[ChartTick]
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import exc
engine = create_engine(DATABASE_URI, echo=False)
ChartData.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
@validate_arguments
def insert(response: ChartFuturesResponse):
print(response.service)
print(response.timestamp)
for candle in response.content:
record = ChartData(**candle.dict())
try:
session.add(record)
session.commit()
print("c", flush=True, end="")
except exc.IntegrityError as e:
session.rollback()
print("r", flush=True, end="")
else:
print()
async def read_stream():
await stream_client.login()
await stream_client.quality_of_service(StreamClient.QOSLevel.EXPRESS)
await stream_client.chart_futures_subs(SYMBOLS + ENERGY)
stream_client.add_chart_futures_handler(lambda msg: insert(msg))
while True:
await stream_client.handle_message()
asyncio.get_event_loop().run_until_complete(read_stream())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment