Skip to content

Instantly share code, notes, and snippets.

@robcarver17
Created January 5, 2023 14:06
Show Gist options
  • Save robcarver17/795d11b977261065d1f0da68eb95624f to your computer and use it in GitHub Desktop.
Save robcarver17/795d11b977261065d1f0da68eb95624f to your computer and use it in GitHub Desktop.
import matplotlib
matplotlib.use("TkAgg")
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 22})
import pandas as pd
import datetime
from systems.provided.basic.system import basic_db_futures_system
from systems.provided.rules.ewmac import ewmac_calc_vol
from systems.trading_rules import create_variations, TradingRule
from syscore.genutils import flatten_list
from syscore.dateutils import ROOT_BDAYS_INYEAR
variations=create_variations(TradingRule(ewmac_calc_vol),
[
dict(Lfast=Lfast, Lslow=Lfast*4)
for Lfast in [2,4,8,16,32,64]
],
"Lfast", nameformat="ewmac_%s_%s")
system = basic_db_futures_system(trading_rules=variations)
system.config.forecast_scalars= {'ewmac_Lfast_2': 12.077,
'ewmac_Lfast_4': 8.539940954709955,
'ewmac_Lfast_8': 5.949404365193165,
'ewmac_Lfast_16': 4.104172020369661,
'ewmac_Lfast_32': 2.786994330124792,
'ewmac_Lfast_64': 1.9093945630747895
}
## need to have enough data
instrument_list = system.get_instrument_list(remove_short_history=True,
remove_duplicates=True,
remove_trading_restrictions=False,
remove_bad_markets=False,
days_required=365*1)
all_start_dates = [system.data[instrument_code].index[0] for instrument_code in instrument_list]
all_start_dates.sort()
first_date = all_start_dates[0]
end_date = datetime.date.today()
## need to bear in mind that perf may change over time
fit_dates = pd.date_range(first_date,
end_date,
freq="1Y")
## We assume costs are fixed across time since we don't have costs data, otherwise
## we'd just end up with lower costs when vol is higher
cost_dict = dict(
[
(instrument_code,
system.accounts.get_SR_cost_per_trade_for_instrument(instrument_code),
)
for instrument_code in instrument_list]
)
log_cost_dict = dict(
[
(instrument_code,
np.log(cost_dict[instrument_code]),
)
for instrument_code in instrument_list]
)
## First of all, let's just get for all periods
def acc_curves_for_rule_name(rule_name):
acc_curve_dict = dict(
[
(instrument_code,
system.accounts.pandl_for_instrument_forecast(instrument_code, rule_name)
)
for instrument_code in instrument_list
]
)
return acc_curve_dict
list_of_rules = list(variations.keys())
acc_curve_all = dict([
(rule_name,
acc_curves_for_rule_name(rule_name))
for rule_name in list_of_rules
])
## averages
def sr_for_rule(rule_name, curve_type: str):
all_sr = [sr_for_rule_type_instrument(
rule_name=rule_name,
instrument_code=instrument_code,
curve_type=curve_type)
for instrument_code in instrument_list]
return np.median(all_sr)
def sr_for_rule_type_instrument(rule_name, instrument_code, curve_type):
curve_for_instrument_type = getattr(acc_curve_all[rule_name][instrument_code], curve_type)
return curve_for_instrument_type.sharpe()
all_gross_sr_median = [sr_for_rule(rule_name, "gross") for rule_name in list_of_rules]
#all_net_sr_median = [sr_for_rule(rule_name, "net") for rule_name in list_of_rules]
to_plot = pd.Series(all_gross_sr_median, index=list_of_rules)
## length of data
length_of_data = dict([
(instrument_code,
system.data.length_of_history_in_days_for_instrument(instrument_code)
)
for instrument_code in instrument_list
])
## scatter
to_plot = pd.concat([pd.Series(length_of_data), pd.Series(cost_dict)], axis=1)
to_plot.columns = ['days', 'costs']
## scatter plot for each rule
def list_of_overall_sr_for_rule(rule_name, curve_type="gross"):
return [sr_for_rule_type_instrument(rule_name, instrument_code, curve_type=curve_type) for instrument_code in instrument_list]
curve_type="net"
subset_for_low_cost_only = False ## only works for net
rule_name = list_of_rules[5]
rule_turnover = system.accounts.forecast_turnover("EDOLLAR", rule_name) ## pooled so doesn't matter
max_cost = 0.13 / rule_turnover
log_max_cost = np.log(max_cost)
data = pd.DataFrame(dict(
codes = instrument_list,
log_costs = list(log_cost_dict.values()),
sr = list_of_overall_sr_for_rule(rule_name, curve_type=curve_type)
))
if subset_for_low_cost_only:
data = data[data.log_costs<(np.log(max_cost*1.1))]
which_costs = "log_costs"
from sklearn.linear_model import LinearRegression
# Creating a Linear Regression model on our data
lin = LinearRegression()
lin.fit(data[[which_costs]], data['sr'])
# Creating a plot
r2=lin.score(data[[which_costs]], data['sr'])
if curve_type=="gross":
ax = data.plot.scatter(x=which_costs, y='sr', title="%s Rsquared %.3f" % (rule_name, r2))
ax.plot(data[which_costs], lin.predict(data[[which_costs]]), c='r')
else:
if subset_for_low_cost_only:
ax = data.plot.scatter(x=which_costs, y='sr', title="%s Rsquared %.3f" % (rule_name, r2))
ax.plot(data[which_costs], lin.predict(data[[which_costs]]), c='r')
plt.axvline(log_max_cost, c="r")
else:
## plot vertical cost line no fit
ax = data.plot.scatter(x=which_costs, y='sr', title="%s net" % (rule_name))
plt.axvline(log_max_cost, c="r")
### OPTIMAL TRADING SPEED
speed_as_list = np.array([1,2,3,4,5,6])
def optimal_trading_rule_for_instrument(instrument_code, curve_type="gross",
return_nan: bool = False):
sr_by_rule = pd.Series([
sr_for_rule_type_instrument(rule_name, instrument_code, curve_type=curve_type)
for rule_name in list_of_rules])
sr_by_rule[sr_by_rule<0] = 0
if sr_by_rule.sum()==0:
if return_nan:
return np.nan
else:
return 7.0
sr_by_rule_as_weight = sr_by_rule / sr_by_rule.sum()
weight_by_speed = sr_by_rule_as_weight * speed_as_list
optimal_speed = weight_by_speed.sum()
return optimal_speed
def optimal_speeds(curve_type="gross", return_nan = False):
speeds = [optimal_trading_rule_for_instrument(instrument_code,
curve_type=curve_type,
return_nan = return_nan)
for instrument_code in instrument_list]
return speeds
curve_type="net"
subset_for_low_cost_only = True ## only works for net
return_nan = True
## max instrument cost with very slowest rule
rule_name = list_of_rules[5]
rule_turnover = system.accounts.forecast_turnover("EDOLLAR", rule_name) ## pooled so doesn't matter
max_cost = 0.13 / rule_turnover
log_max_cost = np.log(max_cost)
data = pd.DataFrame(dict(
codes = instrument_list,
log_costs = list(log_cost_dict.values()),
optimal_speed = optimal_speeds(curve_type=curve_type,
return_nan=return_nan)
))
data = data.dropna() # in case we return_nan
if subset_for_low_cost_only:
data = data[data.log_costs<(np.log(max_cost*1.1))]
which_costs = "log_costs"
from sklearn.linear_model import LinearRegression
# Creating a Linear Regression model on our data
lin = LinearRegression()
lin.fit(data[[which_costs]], data['optimal_speed'])
# Creating a plot
r2=lin.score(data[[which_costs]], data['optimal_speed'])
if curve_type=="gross":
ax = data.plot.scatter(x=which_costs, y='optimal_speed', title="Optimal speed Rsquared %.3f" % ( r2))
ax.plot(data[which_costs], lin.predict(data[[which_costs]]), c='r')
else:
if subset_for_low_cost_only:
ax = data.plot.scatter(x=which_costs, y='optimal_speed', title="Optimal speed (net) Rsquared %.3f" % (r2))
ax.plot(data[which_costs], lin.predict(data[[which_costs]]), c='r')
plt.axvline(log_max_cost, c="r")
else:
## plot vertical cost line no fit
ax = data.plot.scatter(x=which_costs, y='optimal_speed', title="Optimal speed (net)")
plt.axvline(log_max_cost, c="r")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment