Skip to content

Instantly share code, notes, and snippets.

@raposatech
Last active January 16, 2022 07:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save raposatech/d3f10df41c8745b00cb608bd590a986d to your computer and use it in GitHub Desktop.
Save raposatech/d3f10df41c8745b00cb608bd590a986d to your computer and use it in GitHub Desktop.
Starter System with Multiple Instruments
class DiversifiedStarterSystem(MultiSignalStarterSystem):
'''
Carver's Starter System without stop losses, multiple entry rules,
a forecast for position sizing and rebalancing, and multiple instruments.
Adapted from Rob Carver's Leveraged Trading: https://amzn.to/3C1owYn
Code for MultiSignalStarterSystem available here:
https://gist.github.com/raposatech/2d9f309e2a54fc9545d44eda821e29ad
'''
def __init__(self, tickers: list, signals: dict, target_risk: float = 0.12,
starting_capital: float = 1000, margin_cost: float = 0.04,
short_cost: float = 0.001, interest_on_balance: float = 0.0,
start: str = '2000-01-01', end: str = '2020-12-31',
shorts: bool = True, weights: list = [],
max_forecast: float = 2, min_forecast: float = -2,
exposure_drift: float = 0.1,
*args, **kwargs):
self.tickers = tickers
self.n_instruments = len(tickers)
self.signals = signals
self.target_risk = target_risk
self.starting_capital = starting_capital
self.shorts = shorts
self.start = start
self.end = end
self.margin_cost = margin_cost
self.short_cost = short_cost
self.interest_on_balance = interest_on_balance
self.daily_iob = (1 + self.interest_on_balance) ** (1 / 252)
self.daily_margin_cost = (1 + self.margin_cost) ** (1 / 252)
self.daily_short_cost = self.short_cost / 360
self.max_forecast = max_forecast
self.min_forecast = min_forecast
self.max_leverage = 3
self.exposure_drift = exposure_drift
self.signal_names = []
self.weights = weights
self.idm_dict = {
1: 1,
2: 1.15,
3: 1.22,
4: 1.27,
5: 1.29,
6: 1.31,
7: 1.32,
8: 1.34,
15: 1.36,
25: 1.38,
30: 1.4
}
self._getData()
self._calcSignals()
self._setWeights()
self._calcTotalSignal()
self._setIDM()
def _getData(self):
yfObj = yf.Tickers(self.tickers)
df = yfObj.history(start=self.start, end=self.end)
df.drop(['High', 'Open', 'Stock Splits', 'Volume', 'Low'],
axis=1, inplace=True)
# Drop rows where all closing prices are NaN
df = df.iloc[df['Close'].apply(
lambda x: all(~np.isnan(x)), axis=1).values]
df.columns = df.columns.swaplevel()
df = df.fillna(0)
self.data = df
def _setIDM(self):
keys = np.array(list(self.idm_dict.keys()))
idm_idx = keys[np.where(keys<=self.n_instruments)].max()
self.idm = self.idm_dict[idm_idx]
def _clipForecast(self, signal):
return signal.clip(upper=self.max_forecast, lower=self.min_forecast)
def _calcMAC(self, fast, slow, scale):
name = f'MAC{self.n_sigs}'
close = self.data.loc[:, (slice(None), 'Close')]
sma_f = close.rolling(fast).mean()
sma_s = close.rolling(slow).mean()
risk_units = close * self.data.loc[:, (slice(None), 'STD')].values
sig = sma_f - sma_s
sig = sig.ffill().fillna(0) / risk_units * scale
self.signal_names.append(name)
return self._clipForecast(sig).rename(columns={'Close': name})
def _calcMBO(self, periods, scale):
name = f'MBO{self.n_sigs}'
close = self.data.loc[:, (slice(None), 'Close')]
ul = close.rolling(periods).max().values
ll = close.rolling(periods).min().values
mean = close.rolling(periods).mean()
sprice = (close - mean) / (ul - ll)
sig = sprice.ffill().fillna(0) * scale
self.signal_names.append(name)
return self._clipForecast(sig).rename(columns={'Close': name})
def _calcCarry(self, scale):
name = f'Carry{self.n_sigs}'
ttm_div = self.data.loc[:, (slice(None), 'Dividends')].rolling(252).sum()
div_yield = ttm_div / self.data.loc[:, (slice(None), 'Close')].values
net_long = div_yield - self.margin_cost
net_short = self.interest_on_balance - self.short_cost - div_yield
net_return = (net_long - net_short) / 2
sig = net_return / self.data.loc[:, (slice(None), 'STD')].values * scale
self.signal_names.append(name)
return self._clipForecast(sig).rename(columns={'Dividends': name})
def _calcSignals(self):
std = self.data.loc[:, (slice(None), 'Close')].pct_change().rolling(252).std() \
* np.sqrt(252)
self.data = pd.concat([self.data,
std.rename(columns={'Close': 'STD'})], axis=1)
self.n_sigs = 0
for k, v in self.signals.items():
if k == 'MAC':
for v1 in v.values():
sig = self._calcMAC(v1['fast'], v1['slow'], v1['scale'])
self.data = pd.concat([self.data, sig], axis=1)
self.n_sigs += 1
elif k == 'MBO':
for v1 in v.values():
sig = self._calcMBO(v1['N'], v1['scale'])
self.data = pd.concat([self.data, sig], axis=1)
self.n_sigs += 1
elif k == 'CAR':
for v1 in v.values():
if v1['status']:
sig = self._calcCarry(v1['scale'])
self.data = pd.concat([self.data, sig], axis=1)
self.n_sigs += 1
def _calcTotalSignal(self):
sigs = self.data.groupby(level=0, axis=1).apply(
lambda x: x[x.name].apply(
lambda x: np.dot(x[self.signal_names].values,
self.signal_weights), axis=1))
sigs = sigs.fillna(0)
midx = pd.MultiIndex.from_arrays([self.tickers, len(self.tickers)*['signal']])
sigs.columns = midx
self.data = pd.concat([self.data, sigs], axis=1)
def _sizePositions(self, cash, price, instrument_risk, signal, positions, index):
shares = np.zeros(self.n_instruments)
if cash <= 0:
return shares
sig_sub = signal[index]
ir_sub = instrument_risk[index]
capital = (cash + np.dot(price, positions)) / self.n_instruments
exposure = self.target_risk * self.idm * capital * sig_sub / ir_sub
shares[index] += np.floor(exposure / price[index])
insuff_cash = np.where(shares * price >
(cash * self.max_leverage) / self.n_instruments)[0]
if len(insuff_cash) > 0:
shares[insuff_cash] = np.floor(
(cash * self.max_leverage / self.n_instruments) / price[insuff_cash])
return shares
def _getExposureDrift(self, cash, position, price, signal, instrument_risk):
if position.sum() == 0:
return np.zeros(self.n_instruments), np.zeros(self.n_instruments)
capital = (cash + price * position) / self.n_instruments
exposure = self.target_risk * self.idm * capital * signal / instrument_risk
cur_exposure = price * position
avg_exposure = self.target_risk * self.idm * capital / instrument_risk * np.sign(signal)
# Cap exposure leverage
avg_exposure = np.minimum(avg_exposure, self.max_leverage * capital)
return (exposure - cur_exposure) / avg_exposure, avg_exposure
def _calcCash(self, cash_balance, positions, dividends):
cash = cash_balance * self.daily_iob if cash_balance > 0 else \
cash_balance * self.daily_margin_cost
long_idx = np.where(positions>0)[0]
short_idx = np.where(positions<0)[0]
if len(long_idx) > 0:
cash += np.dot(positions[long_idx], dividends[long_idx])
if len(short_idx) > 0:
cash += np.dot(positions[short_idx], dividends[short_idx])
return cash
def _logData(self, positions, cash, rebalance, exp_delta):
# Log data - probably a better way to go about this
self.data['cash'] = cash
df0 = pd.DataFrame(positions,
columns=self.tickers, index=self.data.index)
midx0 = pd.MultiIndex.from_arrays(
[self.tickers, len(self.tickers)*['position']])
df0.columns = midx0
df1 = pd.DataFrame(rebalance,
columns=self.tickers, index=self.data.index)
midx1 = pd.MultiIndex.from_arrays(
[self.tickers, len(self.tickers)*['rebalance']])
df1.columns = midx1
df2 = pd.DataFrame(exp_delta,
columns=self.tickers, index=self.data.index)
midx2 = pd.MultiIndex.from_arrays(
[self.tickers, len(self.tickers)*['exposure_drift']])
df2.columns = midx2
self.data = pd.concat([self.data, df0, df1, df2], axis=1)
portfolio = np.sum(
self.data.loc[:, (slice(None), 'Close')].values * df0.values,
axis=1) + cash
self.data['portfolio'] = portfolio
def _processBar(self, prices, sigs, stds, pos, cash):
open_long = np.where((pos<=0) & (sigs>0))[0]
if len(open_long) > 0:
# Short positions turned to long
lprices = prices[open_long]
cash += np.dot(pos[open_long], lprices)
pos[open_long] = 0
pos += self._sizePositions(cash,
prices, stds, sigs,
pos, open_long)
cash -= np.dot(pos[open_long], lprices)
open_short = np.where((pos>=0) & (sigs<0))[0]
if len(open_short) > 0:
# Close long position and open short
sprices = prices[open_short]
cash += np.dot(pos[open_short], sprices)
pos[open_short] = 0
if self.shorts:
pos -= self._sizePositions(cash,
prices, stds, sigs,
pos, open_short)
cash -= np.dot(pos[open_short], sprices)
neutral = np.where((pos!=0) & (sigs==0))[0]
if len(neutral) > 0:
cash += np.dot(pos[neutral],
prices[neutral])
pos[neutral] = 0
# Rebalance existing positions
delta_exposure, avg_exposure = self._getExposureDrift(
cash, pos, prices, sigs, stds)
drift_idx = np.where(np.abs(delta_exposure) >= self.exposure_drift)[0]
reb_shares = np.zeros(self.n_instruments)
if len(drift_idx) > 0:
reb_shares[drift_idx] = np.round(
delta_exposure * avg_exposure / prices)[drift_idx]
cash -= np.dot(reb_shares, prices)
pos += reb_shares
return pos, cash, reb_shares, delta_exposure
def run(self):
positions = np.zeros((self.data.shape[0], len(self.tickers)))
exp_delta = positions.copy()
rebalance = positions.copy()
cash = np.zeros(self.data.shape[0])
for i, (ts, row) in enumerate(self.data.iterrows()):
prices = row.loc[(slice(None), 'Close')].values
divs = row.loc[(slice(None), 'Dividends')].values
sigs = row.loc[(slice(None), 'signal')].values
stds = row.loc[(slice(None), 'STD')].values
pos = positions[i-1].copy()
cash_t = self._calcCash(cash[i-1], positions[i], divs) \
if i > 0 else self.starting_capital
pos, cash_t, shares, delta_exp = self._processBar(
prices, sigs, stds, pos, cash_t)
positions[i] = pos
cash[i] = cash_t
rebalance[i] = shares
exp_delta[i] = delta_exp
self._logData(positions, cash, rebalance, exp_delta)
self._calcReturns()
def _calcReturns(self):
self.data['strat_log_returns'] = np.log(
self.data['portfolio'] / self.data['portfolio'].shift(1))
self.data['strat_cum_returns'] = np.exp(
self.data['strat_log_returns'].cumsum()) - 1
self.data['strat_peak'] = self.data['strat_cum_returns'].cummax()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment