Skip to content

Instantly share code, notes, and snippets.

@raposatech
Created November 12, 2021 17:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save raposatech/c86f1f5e709b25f0b8e176ef303dddc3 to your computer and use it in GitHub Desktop.
Save raposatech/c86f1f5e709b25f0b8e176ef303dddc3 to your computer and use it in GitHub Desktop.
Starter System with Forecast
class ForecastStarterSystem(ContinuousStarterSystem):
'''
Carver's Starter System without stop losses, multiple entry rules, and
a forecast for position sizing and rebalancing.
Adapted from Rob Carver's Leveraged Trading: https://amzn.to/3C1owYn
ContinuousStarterSystem class defined here:
https://gist.github.com/raposatech/49ccc66f5c312f939f8826251c55a676
'''
def __init__(self, ticker: str, signals: dict, target_risk: float = 0.12,
starting_capital: float = 1000, margin_cost: float = 0.04,
short_cost: float = 0.001, interest_on_balance: float = 0.0,
start: str = '2000-01-01', end: str = '2020-12-31',
shorts: bool = True, weights: list = [],
max_forecast: float = 2, min_forecast: float = -2,
exposure_drift: float = 0.1,
*args, **kwargs):
self.max_forecast = max_forecast
self.min_forecast = min_forecast
self.exposure_drift = exposure_drift
super().__init__(
ticker=ticker, signals=signals, target_risk=target_risk,
margin_cost=margin_cost, short_cost=short_cost, start=start,
end=end, interest_on_balance=interest_on_balance,
starting_capital=starting_capital, shorts=shorts,
weights=weights)
self._calcTotalSignal()
def _clipForecast(self, signal, name):
sig = np.where(signal > self.max_forecast, self.max_forecast, signal)
sig = np.where(sig < self.min_forecast, self.min_forecast, sig)
self.data[name] = sig
self.signal_names.append(name)
def _calcMAC(self, fast, slow, scale):
name = f'MAC{self.n_sigs}'
if f'SMA{fast}' not in self.data.columns:
self.data[f'SMA{fast}'] = self.data['Close'].rolling(fast).mean()
if f'SMA{slow}' not in self.data.columns:
self.data[f'SMA{slow}'] = self.data['Close'].rolling(slow).mean()
sig = self.data[f'SMA{fast}'] - self.data[f'SMA{slow}']
sig = sig.ffill().fillna(0) / self.data['risk_units'] * scale
self._clipForecast(sig, name)
def _calcMBO(self, periods, scale):
name = f'MBO{self.n_sigs}'
ul = self.data['Close'].rolling(periods).max()
ll = self.data['Close'].rolling(periods).min()
mean = self.data['Close'].rolling(periods).mean()
self.data[f'SPrice{periods}'] = (self.data['Close'] - mean) / (ul - ll)
sig = self.data[f'SPrice{periods}'].ffill().fillna(0) * scale
self._clipForecast(sig, name)
def _calcCarry(self, scale):
name = f'Carry{self.n_sigs}'
ttm_div = self.data['Dividends'].rolling(252).sum()
div_yield = ttm_div / self.data['Close']
net_long = div_yield - self.margin_cost
net_short = self.interest_on_balance - self.short_cost - ttm_div
self.data['net_return'] = (net_long - net_short) / 2
sig = self.data['net_return'] / self.data['STD'] * scale
self._clipForecast(sig, name)
def _calcSignals(self):
self.data['STD'] = self.data['Close'].pct_change().rolling(252).std() \
* np.sqrt(252)
self.data['risk_units'] = self.data['STD'] * self.data['Close']
self.n_sigs = 0
for k, v in self.signals.items():
if k == 'MAC':
for v1 in v.values():
self._calcMAC(v1['fast'], v1['slow'], v1['scale'])
self.n_sigs += 1
elif k == 'MBO':
for v1 in v.values():
self._calcMBO(v1['N'], v1['scale'])
self.n_sigs += 1
elif k == 'CAR':
for v1 in v.values():
if v1['status']:
self._calcCarry(v1['scale'])
self.n_sigs += 1
def _calcTotalSignal(self):
self.data['signal'] = self.data.apply(lambda x:
np.dot(x[self.signal_names].values, self.signal_weights),
axis=1)
def _sizePosition(self, capital, price, instrument_risk, signal):
exposure = self.target_risk * capital * \
np.abs(signal) / instrument_risk
shares = np.floor(exposure / price)
if shares * price > capital:
return np.floor(capital / price)
return shares
def _getExposureDrift(self, cash, position, price, signal,
instrument_risk):
if position == 0:
return 0, 0
capital = cash + price * position
exposure = self.target_risk * capital * signal / instrument_risk
cur_exposure = price * position
avg_exposure = self.target_risk * capital / instrument_risk * \
np.sign(signal)
return (exposure - cur_exposure) / avg_exposure, avg_exposure
def run(self):
position = np.zeros(self.data.shape[0])
rebalance = position.copy()
exp_delta = position.copy()
cash = position.copy()
for i, (ts, row) in enumerate(self.data.iterrows()):
if any(np.isnan(row.values)):
cash[i] = self._calcCash(cash[i-1], position[i],
row['Close'], row['Dividends']) if i > 0 \
else self.starting_capital
continue
# Propagate values forward
position[i] = position[i-1]
cash[i] = self._calcCash(cash[i-1], position[i],
row['Close'], row['Dividends'])
if row['signal'] > 0:
if position[i] <= 0:
cash[i] += position[i] * row['Close']
position[i] = self._sizePosition(
cash[i], row['Close'], row['STD'],
row['signal'])
cash[i] -= position[i] * row['Close']
# continue
elif row['signal'] < 0:
if position[i] >= 0:
cash[i] += position[i] * row['Close']
if self.shorts:
position[i] = -self._sizePosition(
cash[i], row['Close'], row['STD'],
row['signal'])
cash[i] -= position[i] * row['Close']
# continue
else:
position[i] = 0
else:
# Remain neutral if signal == 0
cash[i] += position[i] * row['Close']
position[i] = 0
# Check for rebalancing
delta_exposure, avg_exposure = self._getExposureDrift(
cash[i], position[i], row['Close'], row['signal'],
row['STD'])
exp_delta[i] += delta_exposure
if np.abs(delta_exposure) >= self.exposure_drift:
shares = np.round(delta_exposure * avg_exposure /
row['Close'])
cash[i] -= shares * row['Close']
position[i] += shares
rebalance[i] += shares
self.data['position'] = position
self.data['cash'] = cash
self.data['portfolio'] = self.data['position'] * \
self.data['Close'] + self.data['cash']
self.data['rebalance'] = rebalance
self.data['exposure_drift'] = exp_delta
self.data = calcReturns(self.data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment