Skip to content

Instantly share code, notes, and snippets.

@donoage
Last active May 7, 2021 16:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save donoage/5e79e975ba1b98d70d0ba8a361d05d3b to your computer and use it in GitHub Desktop.
Save donoage/5e79e975ba1b98d70d0ba8a361d05d3b to your computer and use it in GitHub Desktop.
common.py
import datetime
import logging
import requests
import tda
import tda.orders.options
import httpx
from dotenv import dotenv_values
import validation as vp
TD_TOKEN_PATH = "config/tda_token.json"
CHROMEDRIVER_PATH = "bins/chromedriver"
ORDER_SETTINGS = {
"eva": {
"risky": 500.00,
"spy": 500.00,
"swing": 500.00,
"buy_limit_percentage": 0.10,
"SL_percentage": 0.25
},
"wolf": {
"risky": 1000.00,
"spy": 1000.00,
"swing": 1000.00,
"buy_limit_percentage": 0.10,
"SL_percentage": 0.25
},
"kin": {
"risky": 1000.00,
"spy": 1000.00,
"swing": 1500.00,
"buy_limit_percentage": 0.10,
"SL_percentage": 0.25
}
}
discord_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) discord/0.0.305 Chrome/69.0.3497.128 Electron/4.0.8 Safari/537.36',
'Authorization': ''
}
td_auth_params = {
'API_KEY': '',
'URI': '',
'ACCOUNT_NUM': ''
}
def get_time_n_hours_ago(hour_delta):
hours_ago = datetime.datetime.now() - datetime.timedelta(hours=hour_delta)
unix_time = hours_ago.strftime("%s")
return unix_time
def snowflake(hour_delta):
n_hours_ago = get_time_n_hours_ago(hour_delta)
return (int(n_hours_ago) * 1000 - 1420070400000) << 22
def fetch_messages(guild_id, channel_id, start_at, seen_messages):
fetch_headers = discord_headers.copy()
fetch_headers['referer'] = f'https://discordapp.com/channels/{guild_id}/{channel_id}'
path = f'https://discordapp.com/api/v6/guilds/{guild_id}/messages/search?channel_id={channel_id}&min_id={start_at}'
response = requests.get(path, headers=fetch_headers)
json_response = response.json()
new_messages = []
if 'messages' not in json_response:
return []
for message in json_response['messages']:
if (not message[0]):
continue
if message[0]['id'] not in seen_messages:
seen_messages.append(message[0]['id'])
new_messages.append(message[0])
new_messages.reverse()
return new_messages
def place_orders(transactions, active_transactions_db):
"""Initialize TDA and order related values,
authenticate with TDA site and place order"""
client = authenticate_tda_account(TD_TOKEN_PATH, td_auth_params['API_KEY'], td_auth_params['URI'])
for transaction in transactions:
print('')
print('---------------------------------')
print(f'transaction: {transaction}')
transaction = vp.reformat_params(transaction)
if transaction['type'] == 'BTO':
if transaction['key'] in active_transactions_db:
print("Skipping, already in this trade.")
return
order_settings = ORDER_SETTINGS[transaction["origin_id"]]
max_price = calc_buy_limit_price(transaction["price"], order_settings["buy_limit_percentage"])
sl_percent = order_settings["SL_percentage"]
process_bto_order(client, td_auth_params['ACCOUNT_NUM'], transaction, max_price, sl_percent)
trade = transaction["key"]
active_transactions_db[transaction['key']] = trade
elif transaction['type'] == 'STC':
old_buy_trade = active_transactions_db.get(transaction['key'])
if old_buy_trade:
process_stc_order(client, td_auth_params['ACCOUNT_NUM'], transaction)
if transaction['key'] in active_transactions_db:
del active_transactions_db[transaction['key']]
def authenticate_tda_account(token_path: str, api_key: str, redirect_uri: str):
"""Takes path to locally stored auth token, TDA app key, and redirect uri then tries
to authenticate. If unable to authenticate with token, performs backup
authentication from login auth flow. Returns authenticated client_tests object"""
client = None
try:
# tda.auth automatically creates and updates token file at token_path
client = tda.auth.client_from_token_file(token_path, api_key)
except FileNotFoundError:
# should on first log-in before token has been retrieved and saved by tda.auth
from selenium import webdriver
# Note that the webdriver executable for your OS must be downloaded and
# saved at the set path. Other webdrivers like Chrome can also be used
with webdriver.Chrome(executable_path=CHROMEDRIVER_PATH) as driver:
client = tda.auth.client_from_login_flow(
driver, api_key, redirect_uri, token_path
)
return client
def process_bto_order(client, acct_num: str, ord_params: dict, max_price, sl_percent):
"""Prepare and place BTO order"""
buy_qty = get_num_contracts(ord_params, max_price)
option_symbol = build_option_symbol(ord_params)
sl_price = calc_sl_price(ord_params["price"], sl_percent)
# prepare buy limit order and accompanying stop loss order
ota_order = build_bto_order_w_stop_loss(
option_symbol, buy_qty, max_price, sl_price
)
try:
response = client.place_order(acct_num, order_spec=ota_order)
assert response.status_code == httpx.codes.OK, response.raise_for_status()
output_response(ord_params, response)
except Exception as e:
print(f'exception: {e}')
def build_option_symbol(ord_params: dict):
""" Returns option symbol as string from order parameters dictionary.
Note that expiration_date must be datetime.datetime object"""
symbol_builder_class = tda.orders.options.OptionSymbol(
underlying_symbol=ord_params["ticker"],
expiration_date=ord_params["exp"], # datetime.datetime obj
contract_type=ord_params["td_order_type"],
strike_price_as_string=ord_params["strike"],
)
# OptionSymbol class does not return symbol until build method is called
symbol = symbol_builder_class.build()
return symbol
def calc_sl_price(contract_price, sl_percent):
"""Returns price that is sl_percent below the contract price"""
return round(contract_price * (1 - sl_percent), 2)
def calc_buy_limit_price(contract_price, buy_limit_percent):
"""Returns buy limit price that is buy_limit_percentage above the contract price"""
return round(contract_price * (1 + buy_limit_percent), 2)
def build_bto_order_w_stop_loss(
option_symbol: str, qty: int, buy_lim_price: float, sl_price: float, kill_fill=True,
):
"""Prepares and returns OrderBuilder object for one-trigger another order.
First order is BTO limit and second order is STC stop-market"""
# prepare pre-filled OrderBuilder objs
print('BUY LIMIT ORDER')
print(f'option symbol: {option_symbol}')
print(f'qty: {qty}')
print(f'buy_lim_price: {buy_lim_price}')
print('=============')
bto_lim = tda.orders.options.option_buy_to_open_limit(
option_symbol, qty, buy_lim_price
)
print('SELL STOP MARKET ORDER')
print(f'option symbol: {option_symbol}')
print(f'qty: {qty}')
print(f'sl_price: {sl_price}')
print('=============')
stc_stop_limit = build_stc_stop_limit_order(option_symbol, qty, sl_price)
one_trigger_other = tda.orders.common.first_triggers_second(
bto_lim, stc_stop_limit
)
return one_trigger_other
def build_oco_order_w_stop_loss(option_symbol: str, qty: int, sell_lim_price: float, sl_price: float):
"""Prepares and returns OrderBuilder object for one-cancels-the-other order.
First order is STC limit and second order is STC stop-market"""
# prepare pre-filled OrderBuilder objs
print('SELL LIMIT ORDER')
print(f'option symbol: {option_symbol}')
print(f'qty: {qty}')
print(f'sell_lim_price: {sell_lim_price}')
print('=============')
stc_lim = tda.orders.options.option_sell_to_close_limit(
option_symbol, qty, sell_lim_price
)
print('SELL STOP MARKET ORDER')
print(f'option symbol: {option_symbol}')
print(f'qty: {qty}')
print(f'sl_price: {sl_price}')
print('=============')
stc_stop_market = build_stc_stop_market_order(option_symbol, qty, sl_price)
one_cancels_other = tda.orders.common.one_cancels_other(
stc_lim, stc_stop_market
)
return one_cancels_other
def build_bto_market_order(option_symbol: str, qty: int):
order = tda.orders.options.option_buy_to_open_market(option_symbol, qty)
return order
def build_stc_stop_market_order(symbol: str, qty: int, stop_price: float):
"""Return an OrderBuilder object for a STC stop-market order"""
order = tda.orders.options.option_sell_to_close_market(symbol, qty)
order.set_order_type(tda.orders.common.OrderType.STOP)
trunc_price = round(stop_price, 2)
order.set_stop_price(trunc_price) # truncated float
order.set_duration(tda.orders.common.Duration.GOOD_TILL_CANCEL)
return order
def build_stc_stop_limit_order(symbol: str, qty: int, stop_price: float):
"""Return an OrderBuilder object for a STC stop-market order"""
order = tda.orders.options.option_sell_to_close_limit(
symbol, qty, stop_price)
order.set_order_type(tda.orders.common.OrderType.STOP_LIMIT)
trunc_price = round(stop_price, 2)
order.set_stop_price(trunc_price) # truncated float
order.set_duration(tda.orders.common.Duration.GOOD_TILL_CANCEL)
return order
def output_response(ord_params: dict, response):
"""Logs non-json response and sends it to std.out"""
logging.info(ord_params)
logging.info(response)
print(f'order_params: {ord_params}')
print(f"Processed order. Response received: {response}")
def process_stc_order(client, acct_num: str, ord_params: dict):
""" Prepare and place STC order"""
option_symbol = build_option_symbol(ord_params)
pos_qty = get_position_quant(client, acct_num, option_symbol)
if pos_qty and pos_qty >= 1:
# cancel existing STC orders (like stop-markets)
existing_stc_ids = get_existing_stc_orders(client, option_symbol)
if len(existing_stc_ids) > 0:
for ord_id in existing_stc_ids:
response = client.cancel_order(ord_id, acct_num)
logging.info(response.content)
# sell the entire position, TODO: add trim logic
stc = build_stc_market_order(option_symbol, pos_qty)
print('SELLING TO CLOSE')
print(f'option symbol: {option_symbol}')
print(f'qty: {pos_qty}')
print('=============')
response = client.place_order(acct_num, order_spec=stc)
output_response(ord_params, response)
def get_position_quant(client, acct_id: str, symbol: str):
"""Takes client, account_id, and symbol to search for.
Returns position long quantity for symbol"""
response = client.get_account(
acct_id, fields=tda.client.Client.Account.Fields.POSITIONS
)
summary = response.json()
positions = summary["securitiesAccount"]["positions"]
for position in positions:
if position["instrument"]["symbol"] == symbol:
return float(position["longQuantity"])
def get_position_price(client, acct_id: str, symbol: str):
"""Takes client, account_id, and symbol to search for.
Returns position price for symbol"""
print('getting position price...')
response = client.get_account(
acct_id, fields=tda.client.Client.Account.Fields.POSITIONS
)
summary = response.json()
positions = summary["securitiesAccount"]["positions"]
for position in positions:
if position["instrument"]["symbol"] == symbol:
return float(position["averagePrice"])
def get_existing_stc_orders(client, symbol: str, hours=32):
"""Returns a list of existing single-leg STC orders for the given symbol.
This library is not currently designed to work with multi-leg (complex) orders"""
now = datetime.datetime.utcnow()
query_start = now - datetime.timedelta(hours=hours)
statuses = (
tda.client.Client.Order.Status.FILLED,
tda.client.Client.Order.Status.QUEUED,
tda.client.Client.Order.Status.ACCEPTED,
tda.client.Client.Order.Status.WORKING,
) # waiting for tda patch to implement multi-status check and speed up query time
response = client.get_orders_by_query(
from_entered_datetime=query_start, statuses=None
)
summary = response.json()
order_ids = []
for order in summary:
stc_found = check_stc_order(order, symbol)
if stc_found is not None:
order_ids.append(stc_found)
elif order["orderStrategyType"] == "TRIGGER": # has a child order
# not currently handling conditional orders with more than one child order
child = order["childOrderStrategies"][0]
child_order_stc = check_stc_order(child, symbol)
if child_order_stc is not None:
order_ids.append(child_order_stc)
return order_ids
def check_stc_order(order, symbol):
"""Return order id if order has an in-effect STC order
for input symbol, else return None"""
# note: OCO orders don't have status as a top-level key
if "status" in order:
if order["status"] in ["WORKING", "QUEUED", "ACCEPTED"]:
if len(order["orderLegCollection"]) == 1: # no multi-leg orders
instruct = order["orderLegCollection"][0]["instruction"]
ord_symbol = order["orderLegCollection"][0]["instrument"]["symbol"]
if ord_symbol == symbol and instruct == "SELL_TO_CLOSE":
return str(order["orderId"])
def build_stc_market_order(symbol: str, pos_qty: float):
""" Returns STC market order OrderBuilder obj"""
return tda.orders.options.option_sell_to_close_market(symbol, pos_qty)
def round_up_to_base(x, base=5):
return base * round((x + base) / base)
def round_down_to_base(x, base=5):
return base * round((x - base) / base)
def get_num_contracts(transaction, contract_price):
lot_price = 100 * contract_price
order_settings = ORDER_SETTINGS[transaction["origin_id"]]
if transaction["is_risky"]:
num_contracts = order_settings["risky"] // lot_price
else:
num_contracts = order_settings["swing"] // lot_price
if transaction["ticker"] == "SPY":
num_contracts = order_settings["spy"] // lot_price
num_contracts = num_contracts if num_contracts > 0 else 1
return int(num_contracts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment