Skip to content

Instantly share code, notes, and snippets.

@18182324
Last active December 3, 2021 08:35
Show Gist options
  • Save 18182324/c3cd48a9a5e11056e1efba16bfa3f42c to your computer and use it in GitHub Desktop.
Save 18182324/c3cd48a9a5e11056e1efba16bfa3f42c to your computer and use it in GitHub Desktop.
#requirements
#alpaca-trade-api>=0.25
#ta
#sklearn
import alpaca_trade_api as tradeapi
import requests
import time
from ta import macd
import numpy as np
from datetime import datetime, timedelta
from pytz import timezone
# Replace these with your API connection info from the dashboard
base_url = 'Your API URL'
api_key_id = 'Your API Key'
api_secret = 'Your API Secret'
api = tradeapi.REST(
base_url=base_url,
key_id=api_key_id,
secret_key=api_secret
)
session = requests.session()
# We only consider stocks with per-share prices inside this range
min_share_price = 2.0
max_share_price = 13.0
# Minimum previous-day dollar volume for a stock we might consider
min_last_dv = 500000
# Stop limit to default to
default_stop = .95
# How much of our portfolio to allocate to any one position
risk = 0.001
def get_1000m_history_data(symbols):
print('Getting historical data...')
minute_history = {}
c = 0
for symbol in symbols:
minute_history[symbol] = api.polygon.historic_agg(
size="minute", symbol=symbol, limit=1000
).df
c += 1
print('{}/{}'.format(c, len(symbols)))
print('Success.')
return minute_history
def get_tickers():
print('Getting current ticker data...')
tickers = api.polygon.all_tickers()
print('Success.')
assets = api.list_assets()
symbols = [asset.symbol for asset in assets if asset.tradable]
return [ticker for ticker in tickers if (
ticker.ticker in symbols and
ticker.lastTrade['p'] >= min_share_price and
ticker.lastTrade['p'] <= max_share_price and
ticker.prevDay['v'] * ticker.lastTrade['p'] > min_last_dv and
ticker.todaysChangePerc >= 3.5
)]
def find_stop(current_value, minute_history, now):
series = minute_history['low'][-100:] \
.dropna().resample('5min').min()
series = series[now.floor('1D'):]
diff = np.diff(series.values)
low_index = np.where((diff[:-1] <= 0) & (diff[1:] > 0))[0] + 1
if len(low_index) > 0:
return series[low_index[-1]] - 0.01
return current_value * default_stop
def run(tickers, market_open_dt, market_close_dt):
# Establish streaming connection
conn = tradeapi.StreamConn(base_url=base_url, key_id=api_key_id, secret_key=api_secret)
# Update initial state with information from tickers
volume_today = {}
prev_closes = {}
for ticker in tickers:
symbol = ticker.ticker
prev_closes[symbol] = ticker.prevDay['c']
volume_today[symbol] = ticker.day['v']
symbols = [ticker.ticker for ticker in tickers]
print('Tracking {} symbols.'.format(len(symbols)))
minute_history = get_1000m_history_data(symbols)
portfolio_value = float(api.get_account().portfolio_value)
open_orders = {}
positions = {}
# Cancel any existing open orders on watched symbols
existing_orders = api.list_orders(limit=500)
for order in existing_orders:
if order.symbol in symbols:
api.cancel_order(order.id)
stop_prices = {}
latest_cost_basis = {}
# Track any positions bought during previous executions
existing_positions = api.list_positions()
for position in existing_positions:
if position.symbol in symbols:
positions[position.symbol] = float(position.qty)
# Recalculate cost basis and stop price
latest_cost_basis[position.symbol] = float(position.cost_basis)
stop_prices[position.symbol] = (
float(position.cost_basis) * default_stop
)
# Keep track of what we're buying/selling
target_prices = {}
partial_fills = {}
# Use trade updates to keep track of our portfolio
@conn.on(r'trade_update')
async def handle_trade_update(conn, channel, data):
symbol = data.order['symbol']
last_order = open_orders.get(symbol)
if last_order is not None:
event = data.event
if event == 'partial_fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = qty
positions[symbol] += qty
open_orders[symbol] = data.order
elif event == 'fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = 0
positions[symbol] += qty
open_orders[symbol] = None
elif event == 'canceled' or event == 'rejected':
partial_fills[symbol] = 0
open_orders[symbol] = None
@conn.on(r'A$')
async def handle_second_bar(conn, channel, data):
symbol = data.symbol
# First, aggregate 1s bars for up-to-date MACD calculations
ts = data.start
ts -= timedelta(seconds=ts.second, microseconds=ts.microsecond)
try:
current = minute_history[data.symbol].loc[ts]
except KeyError:
current = None
new_data = []
if current is None:
new_data = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
else:
new_data = [
current.open,
data.high if data.high > current.high else current.high,
data.low if data.low < current.low else current.low,
data.close,
current.volume + data.volume
]
minute_history[symbol].loc[ts] = new_data
# Next, check for existing orders for the stock
existing_order = open_orders.get(symbol)
if existing_order is not None:
# Make sure the order's not too old
submission_ts = existing_order.submitted_at.astimezone(
timezone('America/New_York')
)
order_lifetime = ts - submission_ts
if order_lifetime.seconds // 60 > 1:
# Cancel it so we can try again for a fill
api.cancel_order(existing_order.id)
return
# Now we check to see if it might be time to buy or sell
since_market_open = ts - market_open_dt
until_market_close = market_close_dt - ts
if (
since_market_open.seconds // 60 > 15 and
since_market_open.seconds // 60 < 60
):
# Check for buy signals
# See if we've already bought in first
position = positions.get(symbol, 0)
if position > 0:
return
# See how high the price went during the first 15 minutes
lbound = market_open_dt
ubound = lbound + timedelta(minutes=15)
high_15m = 0
try:
high_15m = minute_history[symbol][lbound:ubound]['high'].max()
except Exception as e:
# Because we're aggregating on the fly, sometimes the datetime
# index can get messy until it's healed by the minute bars
return
# Get the change since yesterday's market close
daily_pct_change = (
(data.close - prev_closes[symbol]) / prev_closes[symbol]
)
if (
daily_pct_change > .04 and
data.close > high_15m and
volume_today[symbol] > 30000
):
# check for a positive, increasing MACD
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=12,
n_slow=26
)
if (
hist[-1] < 0 or
not (hist[-3] < hist[-2] < hist[-1])
):
return
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=40,
n_slow=60
)
if hist[-1] < 0 or np.diff(hist)[-1] < 0:
return
# Stock has passed all checks; figure out how much to buy
stop_price = find_stop(
data.close, minute_history[symbol], ts
)
stop_prices[symbol] = stop_price
target_prices[symbol] = data.close + (
(data.close - stop_price) * 3
)
shares_to_buy = portfolio_value * risk // (
data.close - stop_price
)
if shares_to_buy == 0:
shares_to_buy = 1
shares_to_buy -= positions.get(symbol, 0)
if shares_to_buy <= 0:
return
print('Submitting buy for {} shares of {} at {}'.format(
shares_to_buy, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(shares_to_buy), side='buy',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
if(
since_market_open.seconds // 60 >= 24 and
until_market_close.seconds // 60 > 15
):
# Check for liquidation signals
# We can't liquidate if there's no position
position = positions.get(symbol, 0)
if position == 0:
return
# Sell for a loss if it's fallen below our stop price
# Sell for a loss if it's below our cost basis and MACD < 0
# Sell for a profit if it's above our target price
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=13,
n_slow=21
)
if (
data.close <= stop_prices[symbol] or
(data.close >= target_prices[symbol] and hist[-1] <= 0) or
(data.close <= latest_cost_basis[symbol] and hist[-1] <= 0)
):
print('Submitting sell for {} shares of {} at {}'.format(
position, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(position), side='sell',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
elif (
until_market_close.seconds // 60 <= 15
):
# Liquidate remaining positions on watched symbols at market
try:
position = api.get_position(symbol)
except Exception as e:
# Exception here indicates that we have no position
return
print('Trading over, liquidating remaining position in {}'.format(
symbol)
)
api.submit_order(
symbol=symbol, qty=position.qty, side='sell',
type='market', time_in_force='day'
)
symbols.remove(symbol)
if len(symbols) <= 0:
conn.close()
conn.deregister([
'A.{}'.format(symbol),
'AM.{}'.format(symbol)
])
# Replace aggregated 1s bars with incoming 1m bars
@conn.on(r'AM$')
async def handle_minute_bar(conn, channel, data):
ts = data.start
ts -= timedelta(microseconds=ts.microsecond)
minute_history[data.symbol].loc[ts] = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
volume_today[data.symbol] += data.volume
channels = ['trade_updates']
for symbol in symbols:
symbol_channels = ['A.{}'.format(symbol), 'AM.{}'.format(symbol)]
channels += symbol_channels
print('Watching {} symbols.'.format(len(symbols)))
run_ws(conn, channels)
# Handle failed websocket connections by reconnecting
def run_ws(conn, channels):
try:
conn.run(channels)
except Exception as e:
print(e)
conn.close()
run_ws(conn, channels)
if __name__ == "__main__":
# Get when the market opens or opened today
nyc = timezone('America/New_York')
today = datetime.today().astimezone(nyc)
today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d')
calendar = api.get_calendar(start=today_str, end=today_str)[0]
market_open = today.replace(
hour=calendar.open.hour,
minute=calendar.open.minute,
second=0
)
market_open = market_open.astimezone(nyc)
market_close = today.replace(
hour=calendar.close.hour,
minute=calendar.close.minute,
second=0
)
market_close = market_close.astimezone(nyc)
# Wait until just before we might want to trade
current_dt = datetime.today().astimezone(nyc)
since_market_open = current_dt - market_open
while since_market_open.seconds // 60 <= 14:
time.sleep(1)
since_market_open = current_dt - market_open
run(get_tickers(), market_open, market_close)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment