### Following code is boilerplate for optimising
import matplotlib
import pandas as pd
from scipy.optimize import minimize
import numpy as np
from scipy.stats import norm
from collections import namedtuple
def optimise_for_corr_matrix(corr_matrix):
mean_list = [.05]*3
std = [.1]*3
return optimise_inner(mean_list, corr_matrix, std)
def optimise_inner(mean_list, corr_matrix, std):
stdev_list = [std]*len(mean_list)
sigma = sigma_from_corr_and_std(stdev_list, corr_matrix)
return optimise_with_sigma(sigma, mean_list)
def optimise_with_sigma(sigma, mean_list):
mus = np.array(mean_list, ndmin=2).transpose()
number_assets = sigma.shape[1]
start_weights = [1.0 / number_assets] * number_assets
# Constraints - positive weights, adding to 1.0
bounds = [(0.0, 1.0)] * number_assets
cdict = [{'type': 'eq', 'fun': addem}]
ans = minimize(
start_weights, (sigma, mus),
weights = ans['x']
return weights
def neg_SR(weights, sigma, mus):
# Returns minus the Sharpe Ratio (as we're minimising)
weights = np.matrix(weights)
estreturn = (weights * mus)[0, 0]
std_dev = (variance(weights, sigma)**.5)
return -estreturn / std_dev
def variance(weights, sigma):
# returns the variance (NOT standard deviation) given weights and sigma
return (weights * sigma * weights.transpose())[0, 0]
def sigma_from_corr_and_std(stdev_list, corrmatrix):
stdev = np.array(stdev_list, ndmin=2).transpose()
sigma = stdev * corrmatrix * stdev
return sigma
def addem(weights):
# Used for constraints
return 1.0 - sum(weights)
labelledCorrelations = namedtuple("labelledCorrelations", 'ab ac bc')
def three_asset_corr_matrix(labelled_correlations):
:return: np.array 2 dimensions, size
ab = labelled_correlations.ab
ac =
bc = labelled_correlations.bc
m = [[1.0, ab, ac], [ab, 1.0, bc], [ac, bc, 1.0]]
m = np.array(m)
return m
def get_correlation_distribution_point(corr_value, data_points, conf_interval):
fisher_corr = fisher_transform(corr_value)
point_in_fisher_units = \
get_fisher_confidence_point(fisher_corr, data_points, conf_interval)
point_in_natural_units = inverse_fisher(point_in_fisher_units)
return point_in_natural_units
def fisher_transform(corr_value):
if corr_value>=1.0:
corr_value = 0.99999999999999
elif corr_value<=-1.0:
corr_value = -0.99999999999999
return 0.5*np.log((1+corr_value) / (1-corr_value)) # also arctanh
def get_fisher_confidence_point(fisher_corr, data_points, conf_interval):
if conf_interval<0.5:
confidence_in_fisher_units = fisher_confidence(data_points, conf_interval)
point_in_fisher_units = fisher_corr - confidence_in_fisher_units
elif conf_interval>0.5:
confidence_in_fisher_units = fisher_confidence(data_points, 1-conf_interval)
point_in_fisher_units = fisher_corr + confidence_in_fisher_units
point_in_fisher_units = fisher_corr
return point_in_fisher_units
def fisher_confidence(data_points, conf_interval):
data_point_root =fisher_stdev(data_points)*4
conf_point = get_confidence_point(conf_interval)
return data_point_root * conf_point
def fisher_stdev(data_points):
data_point_root = 1/((data_points-3)**.5)
return data_point_root
def get_confidence_point(conf_interval):
conf_point = norm.ppf(1-(conf_interval/2))
return conf_point
def inverse_fisher(fisher_corr_value):
return (np.exp(2*fisher_corr_value) - 1) / (np.exp(2*fisher_corr_value) + 1)
def optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points):
labelled_correlations = extract_asset_pairwise_correlations_from_matrix(corr_matrix)
labelled_correlation_points = calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points)
corr_matrix = three_asset_corr_matrix(labelled_correlation_points)
weights = optimise_for_corr_matrix(corr_matrix)
return weights
def extract_asset_pairwise_correlations_from_matrix(corr_matrix):
ab = corr_matrix[0][1]
ac = corr_matrix[0][2]
bc = corr_matrix[1][2]
return labelledCorrelations(ab=ab, ac=ac, bc=bc)
def calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points):
correlation_point_list = [get_correlation_distribution_point(corr_value, data_points, confidence_interval)
for corr_value, confidence_interval in
zip(labelled_correlations, conf_intervals)]
labelled_correlation_points = labelledCorrelations(*correlation_point_list)
return labelled_correlation_points
def optimised_weights_given_correlation_uncertainty(corr_matrix, data_points, p_step=0.1):
dist_points = np.arange(p_step, stop=(1-p_step)+0.000001, step=p_step)
list_of_weights = []
for conf1 in dist_points:
for conf2 in dist_points:
for conf3 in dist_points:
conf_intervals = labelledCorrelations(conf1, conf2, conf3)
weights = optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points)
array_of_weights = np.array(list_of_weights)
average_weights = np.nanmean(array_of_weights, axis=0)
return average_weights
def apply_min_weight(average_weights):
weights_with_min = [min_weight(weight) for weight in average_weights]
adj_weights = weights_sum_to_total(weights_with_min)
return adj_weights
def min_weight(weight):
if weight<0.1:
return 0.1
return weight
def weights_sum_to_total(weights_with_min):
sum_weights = np.sum(weights_with_min)
adj_weights = weights_with_min / sum_weights
return adj_weights
# get some data
from systems.provided.futures_chapter15.basesystem import futures_system
system = futures_system()
## underlying returns
#instrument_list = ["US10", "SP500", "US5"] # sub portfolio
config = system.config
system = futures_system(config=config)
instrument_list = system.get_instrument_list()
perc_returns = [system.rawdata.get_percentage_returns(instrument_code) for instrument_code in instrument_list]
perc_returns_df = pd.concat(perc_returns, axis=1)
perc_returns_df.columns = instrument_list
## subsystem returns
subsys_returns = [system.accounts.pandl_for_subsystem(instrument_code) for instrument_code in instrument_list]
subsys_returns_df = pd.concat(subsys_returns, axis=1)
subsys_returns_df.columns = instrument_list
## trading rule returns
def get_rule_returns(instrument_code):
rules = list(system.rules.trading_rules().keys())
rule_returns = [system.accounts.pandl_for_instrument_forecast(instrument_code, rule) for rule in rules]
rule_returns_df = pd.concat(rule_returns, axis=1)
rule_returns_df.columns = rules
return rule_returns_df
def get_forecast_and_future_corr(Nweeks_back, Nweeks_forward):
forecast = get_historic_correlations(Nweeks_back)
future = get_future_correlations(Nweeks_forward) # for fixed period
pd_result = merge_forecast_and_future(forecast, future, Nweeks_forward)
return pd_result
def merge_forecast_and_future(forecast, future, Nweeks_forward):
assets = forecast.columns # should be the same won't check
pd_result = []
for asset in assets:
result_for_asset = pd.concat([forecast[asset], future[asset]], axis=1)
# remove tail with nothing
result_for_asset = result_for_asset[:-Nweeks_forward]
# remove overlapping periods which bias R^2
selector = range(0, len(result_for_asset.index), Nweeks_forward)
result_for_asset = result_for_asset.iloc[selector]
result_for_asset.columns = ['forecast', 'turnout']
pd_result = pd.concat(pd_result, axis=0)
return pd_result
def get_future_correlations(Nweeks_forward):
corr = get_rolling_correlations(use_returns, Nweeks_forward)
corr = corr.ffill()
future_corr = corr.shift(-Nweeks_forward)
return future_corr
def get_historic_correlations(Nweeks_back):
corr = get_rolling_correlations(use_returns, Nweeks_back)
corr = corr.ffill()
return corr
def get_rolling_correlations(use_returns, Nperiods):
roll_df = use_returns.rolling(Nperiods, min_periods=4).corr()
perm_names = get_asset_perm_names(use_returns)
roll_list = [get_rolling_corr_for_perm_pair(perm_pair, roll_df) for perm_pair in perm_names]
roll_list_df = pd.concat(roll_list, axis=1)
roll_list_df.columns = ["%s/%s" % (asset1, asset2) for (asset1, asset2) in perm_names]
return roll_list_df
def get_asset_perm_names(use_returns):
asset_names = use_returns.columns
permlist = []
for asset1 in asset_names:
for asset2 in asset_names:
if asset1==asset2:
pairing = [asset1, asset2]
if pairing in permlist:
if pairing in permlist:
return permlist
def get_rolling_corr_for_perm_pair(perm_pair, roll_df):
return roll_df[perm_pair[0]][:,perm_pair[1]]
use_returns = subsys_returns_df
use_returns = use_returns[pd.datetime(1998,1,1):] # common timestamp
use_returns = use_returns.resample("5B").sum() # good enough approx for weekly returns
import matplotlib.pyplot as pyplot
pyplot.rcParams.update({'font.size': 16})
#get_rolling_correlations(use_returns, 52).plot()
Nweeks_list = [4, 7, 13, 26,52, 104, 208, 520]
import statsmodels.api as sm
r_squared = []
for Nweeks_back in Nweeks_list:
pd_result = get_forecast_and_future_corr(Nweeks_back, Nweeks_forward)
pd_result = pd_result.replace([np.inf, -np.inf], np.nan)
pd_result = pd_result.dropna()
x = pd_result.forecast
x = sm.add_constant(x)
y = pd_result.turnout
est = sm.OLS(y, x).fit()
r2 = est.rsquared_adj
ax = pyplot.gca()
ax.scatter(Nweeks_list, r_squared)
## for trading rules a bit more complex
Nweeks_forward = 52 # use 13 weeks for underyling returns, 52 for others
r_squared = []
for Nweeks_back in Nweeks_list:
all_r2 = []
for instrument_code in instrument_list:
print("%s/%s" % (instrument_code, Nweeks_back))
use_returns = get_rule_returns(instrument_code)
use_returns = use_returns[pd.datetime(1998, 1, 1):] # common timestamp
use_returns = use_returns.resample("5B").sum() # good enough approx for weekly returns
pd_result = get_forecast_and_future_corr(Nweeks_back, Nweeks_forward)
pd_result = pd_result.replace([np.inf, -np.inf], np.nan)
pd_result = pd_result.dropna()
x = pd_result.forecast
x = sm.add_constant(x)
y = pd_result.turnout
est = sm.OLS(y, x).fit()
r2_this_instr = est.rsquared_adj
r2 = np.mean(all_r2)
ax = pyplot.gca()
ax.scatter(Nweeks_list, r_squared)
