Skip to content

Instantly share code, notes, and snippets.

@robcarver17
Created November 3, 2020 15:07
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save robcarver17/a02b427ab80f3b0be6e4d2925d3791fe to your computer and use it in GitHub Desktop.
Save robcarver17/a02b427ab80f3b0be6e4d2925d3791fe to your computer and use it in GitHub Desktop.
### Following code is boilerplate for optimising
import matplotlib
matplotlib.use("TkAgg")
import pandas as pd
from scipy.optimize import minimize
import numpy as np
from scipy.stats import norm
from collections import namedtuple
def optimise_for_corr_matrix(corr_matrix):
mean_list = [.05]*3
std = [.1]*3
return optimise_inner(mean_list, corr_matrix, std)
def optimise_inner(mean_list, corr_matrix, std):
stdev_list = [std]*len(mean_list)
sigma = sigma_from_corr_and_std(stdev_list, corr_matrix)
return optimise_with_sigma(sigma, mean_list)
def optimise_with_sigma(sigma, mean_list):
mus = np.array(mean_list, ndmin=2).transpose()
number_assets = sigma.shape[1]
start_weights = [1.0 / number_assets] * number_assets
# Constraints - positive weights, adding to 1.0
bounds = [(0.0, 1.0)] * number_assets
cdict = [{'type': 'eq', 'fun': addem}]
ans = minimize(
neg_SR,
start_weights, (sigma, mus),
method='SLSQP',
bounds=bounds,
constraints=cdict,
tol=0.00001)
weights = ans['x']
return weights
def neg_SR(weights, sigma, mus):
# Returns minus the Sharpe Ratio (as we're minimising)
weights = np.matrix(weights)
estreturn = (weights * mus)[0, 0]
std_dev = (variance(weights, sigma)**.5)
return -estreturn / std_dev
def variance(weights, sigma):
# returns the variance (NOT standard deviation) given weights and sigma
return (weights * sigma * weights.transpose())[0, 0]
def sigma_from_corr_and_std(stdev_list, corrmatrix):
stdev = np.array(stdev_list, ndmin=2).transpose()
sigma = stdev * corrmatrix * stdev
return sigma
def addem(weights):
# Used for constraints
return 1.0 - sum(weights)
labelledCorrelations = namedtuple("labelledCorrelations", 'ab ac bc')
def three_asset_corr_matrix(labelled_correlations):
"""
:return: np.array 2 dimensions, size
"""
ab = labelled_correlations.ab
ac = labelled_correlations.ac
bc = labelled_correlations.bc
m = [[1.0, ab, ac], [ab, 1.0, bc], [ac, bc, 1.0]]
m = np.array(m)
return m
def get_correlation_distribution_point(corr_value, data_points, conf_interval):
fisher_corr = fisher_transform(corr_value)
point_in_fisher_units = \
get_fisher_confidence_point(fisher_corr, data_points, conf_interval)
point_in_natural_units = inverse_fisher(point_in_fisher_units)
return point_in_natural_units
def fisher_transform(corr_value):
if corr_value>=1.0:
corr_value = 0.99999999999999
elif corr_value<=-1.0:
corr_value = -0.99999999999999
return 0.5*np.log((1+corr_value) / (1-corr_value)) # also arctanh
def get_fisher_confidence_point(fisher_corr, data_points, conf_interval):
if conf_interval<0.5:
confidence_in_fisher_units = fisher_confidence(data_points, conf_interval)
point_in_fisher_units = fisher_corr - confidence_in_fisher_units
elif conf_interval>0.5:
confidence_in_fisher_units = fisher_confidence(data_points, 1-conf_interval)
point_in_fisher_units = fisher_corr + confidence_in_fisher_units
else:
point_in_fisher_units = fisher_corr
return point_in_fisher_units
def fisher_confidence(data_points, conf_interval):
data_point_root =fisher_stdev(data_points)*4
conf_point = get_confidence_point(conf_interval)
return data_point_root * conf_point
def fisher_stdev(data_points):
data_point_root = 1/((data_points-3)**.5)
return data_point_root
def get_confidence_point(conf_interval):
conf_point = norm.ppf(1-(conf_interval/2))
return conf_point
def inverse_fisher(fisher_corr_value):
return (np.exp(2*fisher_corr_value) - 1) / (np.exp(2*fisher_corr_value) + 1)
def optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points):
labelled_correlations = extract_asset_pairwise_correlations_from_matrix(corr_matrix)
labelled_correlation_points = calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points)
corr_matrix = three_asset_corr_matrix(labelled_correlation_points)
weights = optimise_for_corr_matrix(corr_matrix)
return weights
def extract_asset_pairwise_correlations_from_matrix(corr_matrix):
ab = corr_matrix[0][1]
ac = corr_matrix[0][2]
bc = corr_matrix[1][2]
return labelledCorrelations(ab=ab, ac=ac, bc=bc)
def calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points):
correlation_point_list = [get_correlation_distribution_point(corr_value, data_points, confidence_interval)
for corr_value, confidence_interval in
zip(labelled_correlations, conf_intervals)]
labelled_correlation_points = labelledCorrelations(*correlation_point_list)
return labelled_correlation_points
def optimised_weights_given_correlation_uncertainty(corr_matrix, data_points, p_step=0.1):
dist_points = np.arange(p_step, stop=(1-p_step)+0.000001, step=p_step)
list_of_weights = []
for conf1 in dist_points:
for conf2 in dist_points:
for conf3 in dist_points:
conf_intervals = labelledCorrelations(conf1, conf2, conf3)
weights = optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points)
list_of_weights.append(weights)
array_of_weights = np.array(list_of_weights)
average_weights = np.nanmean(array_of_weights, axis=0)
return average_weights
def apply_min_weight(average_weights):
weights_with_min = [min_weight(weight) for weight in average_weights]
adj_weights = weights_sum_to_total(weights_with_min)
return adj_weights
def min_weight(weight):
if weight<0.1:
return 0.1
else:
return weight
def weights_sum_to_total(weights_with_min):
sum_weights = np.sum(weights_with_min)
adj_weights = weights_with_min / sum_weights
return adj_weights
# get some data
from systems.provided.futures_chapter15.basesystem import futures_system
system = futures_system()
## underlying returns
#instrument_list = ["US10", "SP500", "US5"] # sub portfolio
config = system.config
del(config.instrument_weights)
system = futures_system(config=config)
instrument_list = system.get_instrument_list()
perc_returns = [system.rawdata.get_percentage_returns(instrument_code) for instrument_code in instrument_list]
perc_returns_df = pd.concat(perc_returns, axis=1)
perc_returns_df.columns = instrument_list
## subsystem returns
subsys_returns = [system.accounts.pandl_for_subsystem(instrument_code) for instrument_code in instrument_list]
subsys_returns_df = pd.concat(subsys_returns, axis=1)
subsys_returns_df.columns = instrument_list
## trading rule returns
def get_rule_returns(instrument_code):
rules = list(system.rules.trading_rules().keys())
rule_returns = [system.accounts.pandl_for_instrument_forecast(instrument_code, rule) for rule in rules]
rule_returns_df = pd.concat(rule_returns, axis=1)
rule_returns_df.columns = rules
return rule_returns_df
def get_forecast_and_future_corr(Nweeks_back, Nweeks_forward):
forecast = get_historic_correlations(Nweeks_back)
future = get_future_correlations(Nweeks_forward) # for fixed period
pd_result = merge_forecast_and_future(forecast, future, Nweeks_forward)
return pd_result
def merge_forecast_and_future(forecast, future, Nweeks_forward):
assets = forecast.columns # should be the same won't check
pd_result = []
for asset in assets:
result_for_asset = pd.concat([forecast[asset], future[asset]], axis=1)
# remove tail with nothing
result_for_asset = result_for_asset[:-Nweeks_forward]
# remove overlapping periods which bias R^2
selector = range(0, len(result_for_asset.index), Nweeks_forward)
result_for_asset = result_for_asset.iloc[selector]
result_for_asset.columns = ['forecast', 'turnout']
pd_result.append(result_for_asset)
pd_result = pd.concat(pd_result, axis=0)
return pd_result
def get_future_correlations(Nweeks_forward):
corr = get_rolling_correlations(use_returns, Nweeks_forward)
corr = corr.ffill()
future_corr = corr.shift(-Nweeks_forward)
return future_corr
def get_historic_correlations(Nweeks_back):
corr = get_rolling_correlations(use_returns, Nweeks_back)
corr = corr.ffill()
return corr
def get_rolling_correlations(use_returns, Nperiods):
roll_df = use_returns.rolling(Nperiods, min_periods=4).corr()
perm_names = get_asset_perm_names(use_returns)
roll_list = [get_rolling_corr_for_perm_pair(perm_pair, roll_df) for perm_pair in perm_names]
roll_list_df = pd.concat(roll_list, axis=1)
roll_list_df.columns = ["%s/%s" % (asset1, asset2) for (asset1, asset2) in perm_names]
return roll_list_df
def get_asset_perm_names(use_returns):
asset_names = use_returns.columns
permlist = []
for asset1 in asset_names:
for asset2 in asset_names:
if asset1==asset2:
continue
pairing = [asset1, asset2]
if pairing in permlist:
continue
pairing.reverse()
if pairing in permlist:
continue
permlist.append(pairing)
return permlist
def get_rolling_corr_for_perm_pair(perm_pair, roll_df):
return roll_df[perm_pair[0]][:,perm_pair[1]]
use_returns = subsys_returns_df
use_returns = use_returns[pd.datetime(1998,1,1):] # common timestamp
use_returns = use_returns.resample("5B").sum() # good enough approx for weekly returns
import matplotlib.pyplot as pyplot
pyplot.rcParams.update({'font.size': 16})
#get_rolling_correlations(use_returns, 52).plot()
Nweeks_list = [4, 7, 13, 26,52, 104, 208, 520]
import statsmodels.api as sm
r_squared = []
for Nweeks_back in Nweeks_list:
print(Nweeks_back)
pd_result = get_forecast_and_future_corr(Nweeks_back, Nweeks_forward)
pd_result = pd_result.replace([np.inf, -np.inf], np.nan)
pd_result = pd_result.dropna()
x = pd_result.forecast
x = sm.add_constant(x)
y = pd_result.turnout
est = sm.OLS(y, x).fit()
r2 = est.rsquared_adj
r_squared.append(r2)
ax = pyplot.gca()
ax.scatter(Nweeks_list, r_squared)
ax.set_xscale('log')
## for trading rules a bit more complex
Nweeks_forward = 52 # use 13 weeks for underyling returns, 52 for others
r_squared = []
for Nweeks_back in Nweeks_list:
print(Nweeks_back)
all_r2 = []
for instrument_code in instrument_list:
print("%s/%s" % (instrument_code, Nweeks_back))
use_returns = get_rule_returns(instrument_code)
use_returns = use_returns[pd.datetime(1998, 1, 1):] # common timestamp
use_returns = use_returns.resample("5B").sum() # good enough approx for weekly returns
pd_result = get_forecast_and_future_corr(Nweeks_back, Nweeks_forward)
pd_result = pd_result.replace([np.inf, -np.inf], np.nan)
pd_result = pd_result.dropna()
x = pd_result.forecast
x = sm.add_constant(x)
y = pd_result.turnout
est = sm.OLS(y, x).fit()
r2_this_instr = est.rsquared_adj
all_r2.append(r2_this_instr)
r2 = np.mean(all_r2)
r_squared.append(r2)
ax = pyplot.gca()
ax.scatter(Nweeks_list, r_squared)
ax.set_xscale('log')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment