Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
"""
The starter system has the following features:
- single market
- binary forecast from simple MAV
- exit from trailing stop loss
- fixed positions once in trade
"""
import matplotlib
matplotlib.use("TkAgg")
from systems.defaults import system_defaults
from syscore.genutils import sign
from systems.provided.futures_chapter15.basesystem import *
from sysdata.configdata import Config
from systems.forecasting import TradingRule
from systems.positionsizing import PositionSizing
from systems.system_cache import diagnostic, output
from copy import copy
import numpy as np
import pandas as pd
from random import getrandbits
import matplotlib.pylab as plt
def simple_mav(price, short=16, long=64, forecast_fixed=10):
"""
Simple moving average crossover
:param price:
:param short: days for short
:param long: days for short
:return: binary time series
"""
short_mav = price.rolling(short, min_periods=1).mean()
long_mav = price.rolling(long, min_periods=1).mean()
signal = short_mav - long_mav
binary = signal.apply(sign)
binary_position = forecast_fixed * binary
return binary_position
class simpleSysystemPosition(object):
def __init__(self, dynamic_vol=False, dynamic_SL = False):
self.current_position = 0.0
self.previous_position = 0.0
self.dynamic_vol = dynamic_vol
self.dynamic_SL = dynamic_SL
def no_position_check_for_trade(self, original_position_now, current_price, current_vol):
assert self.no_current_position
if np.isnan(original_position_now):
# no signal
return 0.0
if original_position_now ==0.0:
return 0.0
# potentially going long / short
# check last position to avoid whipsaw
if self.previous_position != 0.0:
# same way round avoid whipsaw
if sign(
original_position_now) == sign(self.previous_position):
return 0.0
self.initialise_trade(original_position_now, current_price, current_vol)
return original_position_now
@property
def no_current_position(self):
return self.current_position==0.0
def initialise_trade(self, original_position_now, current_price, current_vol):
# okay to do this - we don't want to enter a new position unless sign changed
# we set the position at the sized position at moment of
# inception
self.current_position = original_position_now
self.price_list_since_position_held = [current_price]
self.initial_vol = current_vol
self.initial_position = original_position_now
return original_position_now
def position_on_check_for_close(self, current_price, current_vol):
assert not self.no_current_position
# already holding a position
# calculate HWM
self.update_price_series(current_price)
new_position = self.vol_adjusted_position(current_vol)
time_to_close_trade =self.check_if_hit_stoploss(current_vol)
if time_to_close_trade:
self.close_trade()
return new_position
def update_price_series(self, current_price):
price_list_since_position_held = self.price_list_since_position_held
price_list_since_position_held.append(current_price)
def vol_adjusted_position(self, current_vol):
initial_position = self.initial_position
if self.dynamic_vol:
vol_adjusted_position = (self.initial_vol / current_vol) * initial_position
return vol_adjusted_position
else:
return initial_position
def check_if_hit_stoploss(self, current_vol):
stoploss_gap = self.stoploss_gap(current_vol)
sign_position = sign(self.current_position)
if sign_position == 1:
# long
time_to_close_trade = self.check_if_long_stop_hit(stoploss_gap)
else:
# short
time_to_close_trade = self.check_if_short_stop_hit(stoploss_gap)
return time_to_close_trade
def stoploss_gap(self, current_vol):
xfactor = self.Xfactor
if self.dynamic_vol:
vol = current_vol
else:
vol = self.initial_vol
stoploss_gap = vol * xfactor
return stoploss_gap
@property
def Xfactor(self):
if self.dynamic_SL:
return self.dynamic_xfactor()
else:
return fixed_xfactor()
def dynamic_xfactor(self):
pandl_vol_units = self.vol_adjusted_profit_since_trade_points()
return dynamic_xfactor(pandl_vol_units)
def vol_adjusted_profit_since_trade_points(self):
if self.no_current_position:
return 0.0
initial_vol = self.initial_vol
profit_price_units = self.profit_since_trade_points()
return profit_price_units / initial_vol
def profit_since_trade_points(self):
assert not self.no_current_position
current_position = self.current_position
if current_position>0:
return self.current_price - self.initial_price
else:
return self.initial_price - self.current_price
@property
def current_price(self):
price_list_since_position_held = self.price_list_since_position_held
current_price = price_list_since_position_held[-1]
return current_price
@property
def initial_price(self):
price_list_since_position_held = self.price_list_since_position_held
initial_price = price_list_since_position_held[0]
return initial_price
def check_if_long_stop_hit(self, stoploss_gap):
threshold = self.hwm - stoploss_gap
time_to_close_trade = self.current_price < threshold
return time_to_close_trade
def check_if_short_stop_hit(self, stoploss_gap):
threshold = self.hwm + stoploss_gap
time_to_close_trade = self.current_price > threshold
return time_to_close_trade
@property
def hwm(self):
current_position = self.current_position
if current_position > 0:
return self.hwm_when_long()
else:
return self.hwm_when_short()
def hwm_when_long(self):
price_list_since_position_held = self.price_list_since_position_held
hwm = np.nanmax(price_list_since_position_held)
return hwm
def hwm_when_short(self):
price_list_since_position_held = self.price_list_since_position_held
hwm = np.nanmin(price_list_since_position_held)
return hwm
def close_trade(self):
self.previous_position = copy(self.current_position)
self.current_position = 0.0
self.price_list_since_position_held = []
del(self.initial_vol)
del(self.initial_position)
def fixed_xfactor():
return 8.0
def dynamic_xfactor(pandl_vol_units):
MINIMUM_XFACTOR = 2.0
MAXIMUM_XFACTOR = 8.0
PANDL_UPPER_CUTOFF = 8.0
PANDL_LOWER_CUTOFF = 0.0
if pandl_vol_units<=PANDL_LOWER_CUTOFF:
return MINIMUM_XFACTOR
elif pandl_vol_units>PANDL_UPPER_CUTOFF:
return MAXIMUM_XFACTOR
else:
return MINIMUM_XFACTOR + (pandl_vol_units)*(MAXIMUM_XFACTOR - MINIMUM_XFACTOR)/(PANDL_UPPER_CUTOFF - PANDL_LOWER_CUTOFF)
def stoploss(price, vol, raw_position, dynamic_vol=False, dynamic_SL = False):
"""
Apply trailing stoploss
:param price:
:param vol: eg system.rawdata.daily_returns_volatility("SP500")
:param raw_position: Raw position series, without stoploss or entry / exit logic
:return: New position series
"""
assert all(vol.index == price.index)
assert all(price.index == raw_position.index)
# assume all lined up
simple_system_position = simpleSysystemPosition(
dynamic_vol=dynamic_vol,
dynamic_SL=dynamic_SL)
new_position_list = []
for iday in range(len(price)):
current_price = price[iday]
current_vol = vol[iday]
if simple_system_position.no_current_position:
# no position, check for signal
original_position_now = raw_position[iday]
new_position = simple_system_position.no_position_check_for_trade(original_position_now,
current_price, current_vol)
else:
new_position = simple_system_position.position_on_check_for_close(
current_price, current_vol)
new_position_list.append(new_position)
new_position_df = pd.Series(new_position_list, raw_position.index)
return new_position_df
class PositionSizeWithStopLoss(PositionSizing):
@diagnostic()
def get_subsystem_position_preliminary(self, instrument_code):
"""
Get scaled position (assuming for now we trade our entire capital for one instrument)
"""
self.log.msg(
"Calculating subsystem position for %s" % instrument_code,
instrument_code=instrument_code,
)
"""
We don't allow this to be changed in config
"""
avg_abs_forecast = system_defaults["average_absolute_forecast"]
vol_scalar = self.get_volatility_scalar(instrument_code)
# forecast is binary + or - avg-abs-forecast
forecast = self.get_combined_forecast(instrument_code)
vol_scalar = vol_scalar.reindex(forecast.index).ffill()
# put on a position according to vol and sign of forecast; this will only take effect on a new trade
subsystem_position = vol_scalar * forecast / avg_abs_forecast
return subsystem_position
@output()
def get_subsystem_position(self, instrument_code):
"""
Get scaled position (assuming for now we trade our entire capital for one instrument)
"""
price = self.parent.rawdata.get_daily_prices(instrument_code)
vol = self.parent.rawdata.daily_returns_volatility(instrument_code)
raw_position = self.get_subsystem_position_preliminary(instrument_code)
subsystem_position = stoploss(price, vol, raw_position, dynamic_vol=self.parent.config.dynamic_vol,
dynamic_SL = self.parent.config.dynamic_SL)
return subsystem_position
simple_mav_rule = TradingRule(
dict(function=simple_mav, other_args=dict(long=40, short=10))
)
data = csvFuturesSimData()
## trade by trade p&l
## only works at subsystem level
def system_given_flags(dynamic_vol = False, dynamic_SL = False):
config = Config(
dict(
trading_rules=dict(simple_mav=simple_mav_rule),
percentage_vol_target=16.0,
notional_trading_capital=100000000,
dynamic_vol=dynamic_vol,
dynamic_SL=dynamic_SL
)
)
system = System(
[
Account(),
Portfolios(),
PositionSizeWithStopLoss(),
FuturesRawData(),
ForecastCombine(),
ForecastScaleCap(),
Rules(simple_mav_rule),
],
data,
config,
)
system.set_logging_level("on")
return system
def stats(stacked_returns):
stacked_returns_pandl, stacked_trades_pandl = stacked_returns
print("SR %f" % sharpe_for_stacked(stacked_returns_pandl))
print("Skew trades %f" % skew_for_stacked(stacked_trades_pandl))
print("Skew daily returns %f" % skew_for_stacked(stacked_returns_pandl))
print("Skew weekly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1W"))
print("Skew monthly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1M"))
def stacked_returns_over_instrument(system):
return_list = []
for instrument in system.get_instrument_list():
returns = calc_returns_for_system_and_code(instrument, system)
return_list.append(returns)
stacked_returns_pandl = [x[0] for x in return_list]
stacked_trades_pandl = [x[1] for x in return_list]
return stacked_returns_pandl, stacked_trades_pandl
def stack_to_df(stacked_returns, period=None):
if period is not None:
new_stacked_returns = [returns.resample(period).sum() for returns in stacked_returns]
else:
new_stacked_returns = stacked_returns
df_returns = pd.concat(new_stacked_returns, axis=0)
return df_returns
def calc_returns_for_system_and_code(instrument_code: str, system: System):
pandl_returns_capital = pandl_capital(instrument_code, system, method_used="returns")
pandl_trades_capital = pandl_capital(instrument_code, system, method_used="trades")
return pandl_returns_capital, pandl_trades_capital
def sharpe_for_stacked(stacked_returns_pandl):
sharpe_list = [sharpe(pandl_series_capital) for pandl_series_capital in stacked_returns_pandl]
return np.median(sharpe_list)
def sharpe(pandl_series_capital):
avg = pandl_series_capital.mean()
stdev = pandl_series_capital.std()
return 16* avg / stdev
def skew_for_stacked(stacked_returns_pandl, period="1B"):
resampled = [pandl_series_capital.resample(period) for pandl_series_capital in stacked_returns_pandl]
skew_list = [pandl_series_capital.skew() for pandl_series_capital in resampled]
return np.median(skew_list)
def pandl_capital(instrument_code: str, system: System, method_used: str = "returns"):
pandl_series_money = pandl_money(instrument_code, system, method_used=method_used)
capital = system.config.notional_trading_capital
return pandl_series_money / capital
def pandl_money(instrument_code: str, system: System, method_used: str= "returns"):
pos_series = system.positionSize.get_subsystem_position(instrument_code)
price_series = system.rawdata.get_daily_prices(instrument_code)
block_size =system.data.get_value_of_block_price_move(instrument_code)
fx =system.positionSize.get_fx_rate(instrument_code)
ans = pandl_base(price_series, pos_series, block_size, fx, method_used = method_used)
return ans
def pandl_base(price_series: pd.Series, pos_series: pd.DataFrame, block_size: float, fx: pd.Series,
method_used = "returns"):
pandl_series_local = pandl_local_ccy(price_series, pos_series, block_size, method_used = method_used)
fx_matching = fx.reindex(pandl_series_local.index).ffill()
return pandl_series_local * fx_matching
def pandl_local_ccy(price_series, pos_series, block_size, method_used = "returns"):
if method_used == "returns":
pandl_series_points =pandl_returns_points(price_series, pos_series)
else:
## trades
pandl_series_points = pandl_trades_points(price_series, pos_series)
return pandl_series_points * block_size
def pandl_returns_points(price_series, pos_series):
"""
Calculate pandl for an individual position
:param price: price series
:type price: Tx1 pd.Series
:param trade_series: set of trades done NOT always aligned to price can be length 0
:type trade_series: Tx2 pd.DataFrame columns ['qty', 'price']
:param pos_series: series of positions NOT ALWAYS aligned to price
:type pos_series: Tx1 pd.Series
:returns: pd.Series
"""
# want to have both kinds of price
price_series = price_series.reindex(pos_series.index, method="ffill")
price_returns = price_series.diff()
returns = pos_series.shift(1) * price_returns
return returns
def pandl_trades_points(price_series, pos_series):
"""
:returns: pd.Series, only dates when we're trading
"""
# want to have both kinds of price
returns = pandl_returns_points(price_series, pos_series)
trade_dates_idx = get_trade_dates_idx_from_pos_series(pos_series)
trade_pandl = [sum_between_trade_dates(trade_dates_idx, idx, returns) for idx in range(len(trade_dates_idx))]
trade_returns = pd.Series(trade_pandl, index = returns.index[trade_dates_idx])
return trade_returns
def get_trade_dates_idx_from_pos_series(pos_series):
prev_position = pos_series.shift(1)
trade_dates_idx = [idx for idx, _index_date in enumerate(list(pos_series.index))
if traded(pos_series, prev_position, idx)]
if trade_dates_idx[-1]<len(pos_series)-1:
trade_dates_idx.append(len(pos_series)-1)
return trade_dates_idx
def traded(pos_series, prev_position, idx):
if sign(pos_series[idx]) != sign(prev_position[idx]):
return True
else:
return False
def sum_between_trade_dates(trade_dates_idx, idx, returns):
if idx==0:
previous_date_idx = 0
else:
previous_date_idx = trade_dates_idx[idx-1]
current_idx = trade_dates_idx[idx]
return returns[previous_date_idx: current_idx].sum()
system = system_given_flags(dynamic_vol=True, dynamic_SL=True)
stacked_returns = stacked_returns_over_instrument(system)
stats(stacked_returns)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment