Skip to content

Instantly share code, notes, and snippets.

@robcarver17
Created February 22, 2021 15:44
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save robcarver17/d972941047fcf4cc015fa5eeeafb3127 to your computer and use it in GitHub Desktop.
Save robcarver17/d972941047fcf4cc015fa5eeeafb3127 to your computer and use it in GitHub Desktop.
import matplotlib
matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
from scipy.optimize import minimize
import numpy as np
import scipy.stats as stats
from scipy.stats import norm
from collections import namedtuple
from syscore.optimisation_utils import optimise, sigma_from_corr_and_std
from syscore.correlations import get_avg_corr, boring_corr_matrix
FUDGE_FACTOR_FOR_CORR_WEIGHT_UNCERTAINTY = 4.0
### Following is the optimisation code:
### First the main function.
def optimise_with_sigma(sigma, mean_list):
"""
Given mean and covariance matrix, find portfolio weights that maximise SR
We assume we can have as much leverage as we like to hit a given risk target, hence we can
constrain weights to sum to 1 i.e. no cash option
:param sigma: np.array NxN, covariance matrix
:param mean_list: list of floats N in length, means
:return: list of weights
"""
mus = np.array(mean_list, ndmin=2).transpose()
number_assets = sigma.shape[1]
# Starting weights, equal weighting
start_weights = [1.0 / number_assets] * number_assets
# Set up constraints - positive weights, adding to 1.0
bounds = [(0.0, 1.0)] * number_assets
cdict = [{'type': 'eq', 'fun': addem}]
ans = minimize(
neg_sharpe_ratio,
start_weights, (sigma, mus),
method='SLSQP',
bounds=bounds,
constraints=cdict,
tol=0.00001)
weights = ans['x']
return weights
## This is the function we optimise over
def neg_sharpe_ratio(weights, sigma, mus):
"""
Returns minus return /risk for a portfolio
:param weights: list of floats N in length
:param sigma: np.array NxN, covariance matrix
:param mus: np.array Nx1 in length, means
:return: float
"""
weights = np.matrix(weights)
estreturn = (weights * mus)[0, 0]
std_dev = (variance(weights, sigma)**.5)
# Returns minus the portfolio Sharpe Ratio (as we're minimising)
return -estreturn / std_dev
## Portfolio standard deviation and variance given weights and covariance matrix sigma
def portfolio_stdev(weights, sigma):
std_dev = (variance(weights, sigma)**.5)
return std_dev
def variance(weights, sigma):
# returns the variance (NOT standard deviation) given weights and sigma
return (weights * sigma * weights.transpose())[0, 0]
def addem(weights):
# Used for constraints, weights must sum to 1
return 1.0-sum(weights)
# Useful for working in standard deviation and correlation space
# rather than covariance space
def optimise_with_corr_and_std( mean_list,stdev_list, corrmatrix):
"""
Given mean, standard deviation and correlation matrix, find portfolio weights that maximise SR
:param mean_list: list of N floats
:param stdev_list: list of N floats
:param corrmatrix: NxN np.array
:return: list of floats
"""
sigma = sigma_from_corr_and_std(stdev_list, corrmatrix)
weights = optimise_with_sigma(sigma, mean_list)
return weights
### Do all the examples in the slides
optimise_with_corr_and_std([0.04,0.04,0.04], [.08,.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]]))
optimise_with_corr_and_std( [0.04,0.04,0.04], [.08,.08,0.08],np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]]))
optimise_with_corr_and_std( [0.04,0.04,0.04],[.08,.08,0.08], np.array([[1,0.7,0.0],[0.7,1,0.0],[0.0,0.0,1]]))
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.12], np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]]))
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.085], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]]))
optimise_with_corr_and_std([.04,.04,0.045], [0.08,0.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]]))
""" This code is used by me to get the data from my trading system. You can ignore it
from systems.provided.futures_chapter15.basesystem import futures_system
system = futures_system()
data=dict([(code,system.rawdata.get_percentage_returns(code)) for code in ["SP500", "US10", "US5"]])
import pandas as pd
data = pd.concat(data, axis=1)
data = data.resample("7D").sum()
data = data[pd.datetime(1998,1,1):]
"""
import pandas as pd
data = pd.read_csv("returns.csv")
data.index = data['index']
del(data['index'])
WEEKS_IN_YEAR = 365.25/7
## Some functions to analyse data and return various statistics
## Assumes weekly returns
def annual_returns_from_weekly_data(data):
return data.mean()*WEEKS_IN_YEAR
def annual_stdev_from_weekly_data(data):
return data.std()*(WEEKS_IN_YEAR**.5)
def sharpe_ratio(data):
SR = annual_returns_from_weekly_data(data)/annual_stdev_from_weekly_data(data)
return SR
def optimise_with_data(data):
"""
Given some data, find the optimal SR portfolio
:param data: np.DataFrame containing N columns
:return: list of N floats
"""
mean_list = annual_returns_from_weekly_data(data).values
stdev_list = annual_stdev_from_weekly_data(data).values
corrmatrix = data.corr().values
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix)
return weights
## Bootstrapping code
import random
## Do optimisation with some random data
def optimisation_with_random_bootstrap(data):
bootstrapped_data = get_bootstrap_series(data)
weights = optimise_with_data(bootstrapped_data)
return weights
## Get some random data
def get_bootstrap_series(data):
length_of_series = len(data.index)
random_indices = [int(random.uniform(0,length_of_series)) for _unused in range(length_of_series)]
bootstrap_data = data.iloc[random_indices]
return bootstrap_data
def bootstrap_optimisation_distributions(data, monte_count=1000):
"""
Bootstrap over data, returning distribution of portfolio weights
:param data: np.DataFrame containing N columns
:param monte_count: int, how many bootstraps to do
:return: list of N floats
"""
dist_of_weights = []
for i in range(monte_count):
single_bootstrap_weights = optimisation_with_random_bootstrap(data)
dist_of_weights.append(single_bootstrap_weights)
dist_of_weights = pd.DataFrame(dist_of_weights)
dist_of_weights.columns = data.columns
return dist_of_weights
## Generate data for the slides
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000)
plt.hist(weight_distr.SP500, bins=100)
plt.hist(weight_distr.US10, bins=100)
plt.hist(weight_distr.US5, bins=100)
## Now for the plots showing the sensitivity of weights to a given factor, eg Sharpe Ratio
## First we need a function that allows us to estimate correlations for a given 'code' eg SP500/US10
def correlation_for_code(data):
"""
Calculate correlation matrix and return as dict
:param data: pd.DataFrame with column names eg xyz...
:return: dict with keys eg x/y, y/z, x/z, y/x, z/y, z/x ... containing correlations
"""
corr = data.corr()
instruments = data.columns
results_dict = {}
for instr1 in instruments:
for instr2 in instruments:
code = join_corr_code(instr1, instr2)
results_dict[code] = corr.loc[instr1, instr2]
return results_dict
def split_corr_code(code):
split_code = code.split("/")
instr1 = split_code[0]
instr2 = split_code[1]
return instr1, instr2
def join_corr_code(instr1, instr2):
return '%s/%s' % (instr1, instr2)
## Now we have functions that allow us to find the factor distribution for any factor
func_dict = dict(sharpe=sharpe_ratio, stdev = annual_stdev_from_weekly_data,
corr = correlation_for_code)
factor_list = list(func_dict.keys())
def plot_changeling(code, factor,data):
"""
Top level function which gets the data, plots and analyses it
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:return: None
"""
assert factor in factor_list
# Get the data
factor_distribution, weights_data = changeling_graph_data(code, factor, data)
# Now make two plots
fig, (ax1, ax2) = plt.subplots(2)
fig.suptitle("Distribution of %s for %s, average %.2f" % (factor, code, np.mean(factor_distribution)))
ax1.hist(factor_distribution, bins=50)
weights_data.plot(ax=ax2)
# Now analyse
analyse_changeling_results(code, factor, factor_distribution, data)
def changeling_graph_data(code, factor, data):
"""
Get the data required for the graphing; a factor distribution and the matched weights
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:return: tuple: factor_distribution and weights
"""
factor_distribution = get_factor_distribution(code, factor, data)
weights_data = get_weights_data(code, factor, data, factor_distribution)
return factor_distribution, weights_data
def get_factor_distribution(code, factor, data, monte_length=10000):
"""
Get the bootstrapped distribution of a given factor value
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param monte_length: int, how many monte carlo runs to do
:return: list of float
"""
factor_func = func_dict[factor]
factor_distr = []
for _not_used in range(monte_length):
bootstrap_data = get_bootstrap_series(data)
factor_estimate = factor_func(bootstrap_data)
factor_estimate_for_code = factor_estimate[code]
factor_distr.append(factor_estimate_for_code)
# note we drop the outlying values which are probably extreme
factor_distr.sort()
drop_values = int(monte_length*.01)
factor_distr = factor_distr[drop_values:-drop_values]
return factor_distr
def get_weights_data(code, factor, data, factor_distribution, points_to_use=100):
"""
Optimise weights, using the statistics in data, but replacing the estimate of factor for code
by various points on the factor distribution.
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param factor_distribution: list of float
:param points_to_use: int, how many points on the factor distribution
:return: pd.DataFrame of weights, same columns as data, length is points_to_use
"""
factor_range = [np.min(factor_distribution), np.max(factor_distribution)]
factor_step = (factor_range[1] - factor_range[0])/points_to_use
factor_values_to_test = np.arange(start = factor_range[0], stop = factor_range[1], step = factor_step)
weight_results = []
for factor_value_to_use in factor_values_to_test:
weights = optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use)
weight_results.append(weights)
weight_results = pd.DataFrame(weight_results)
weight_results.columns = data.columns
return weight_results
def optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use):
"""
Optimise weights, using the statistics in data, but replacing the estimate of factor for code
by factor_value_to_use
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param factor_value_to_use: float
:return: list of floats, weights
"""
mean_list, stdev_list, corrmatrix = replace_factor_value(data, code, factor, factor_value_to_use)
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix)
return weights
def replace_factor_value(data, code, factor, factor_value_to_use):
"""
Estimate statistics from data, but replacing the estimate of factor for code
by factor_value_to_use
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param factor_value_to_use: float
:return: 3 tuple mean_list, stdev_list, corrmatrix
"""
# First standard deviation
stdev_list = annual_stdev_from_weekly_data(data)
if factor=="stdev":
stdev_list[code] = factor_value_to_use
stdev_list = stdev_list.values
# Next Sharpe Ratio
sharpe_list = sharpe_ratio(data)
if factor=="sharpe":
sharpe_list[code] = factor_value_to_use
sharpe_list = sharpe_list.values
# We don't derive mean directly instead we derive it from Sharpe Ratios
mean_list = stdev_list * sharpe_list
# Now correlations. Needs a fancy function
corrmatrix = data.corr()
if factor=="corr":
corrmatrix = replace_corr_value(corrmatrix, code, factor_value_to_use)
corrmatrix = corrmatrix.values
return mean_list, stdev_list, corrmatrix
def replace_corr_value(corrmatrix, code, factor_value_to_use):
"""
Given a correlation matrix, replace one value (actually two because of symettry)
:param corrmatrix: NxN matrix with labels
:param code: str 'A/B' where A and N are labels of correlation matrix
:param factor_value_to_use: float, to replace A/B and B/A correlation with
:return: new NxN correlation matrix
"""
instr1, instr2 = split_corr_code(code)
corrmatrix.loc[instr1, instr2] = factor_value_to_use
corrmatrix.loc[instr2, instr1] = factor_value_to_use
return corrmatrix
def analyse_changeling_results(code, factor, factor_distribution, data):
"""
Prints some analysis of factor_distribution
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param factor_distribution: list of float
:return: None, just prints
"""
factor5 = np.percentile(factor_distribution, 5)
factor95 = np.percentile(factor_distribution, 95)
print("There is a 90%% chance that %s for %s was between %.2f and %.2f" % (factor, code, factor5, factor95))
weights5 = optimise_with_replaced_factor_value(data, code, factor, factor5)
weights95 = optimise_with_replaced_factor_value(data, code, factor, factor95)
## Get the weights we need for code
## For correlations return the weights of the first item in the pair 'A/B'
if factor=="corr":
code = split_corr_code(code)[0]
# Weights are list not dict, so need to know position of right weight
instruments = list(data.columns)
code_index = instruments.index(code)
weight5_code = weights5[code_index]
weight95_code = weights95[code_index]
print("Giving weights for %s between %.3f and %.3f" % (code, weight5_code, weight95_code))
# Now do the plots in the slides
plot_changeling("SP500", "sharpe", data)
plot_changeling("US10", "sharpe", data)
plot_changeling("US5", "sharpe", data)
plot_changeling("SP500", "stdev", data)
plot_changeling("US10", "stdev", data)
plot_changeling("US5", "stdev", data)
plot_changeling("SP500/US10", "corr", data)
plot_changeling("SP500/US5", "corr", data)
plot_changeling("US5/US10", "corr", data)
## Let's do optimisation better!
## 1) Bootstrapped weights
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000)
weight_distr.mean()
## 2) Set some values to be identical
def optimise_with_identical_values(data, identical_SR=False, identical_stdev=False, identical_corr=False):
if identical_stdev:
stdev_list = get_identical_stdev(data)
else:
stdev_list = annual_stdev_from_weekly_data(data).values
if identical_SR:
## We want means, but derive them from SR based on the standard deviations we have
mean_list = get_means_assuming_identical_SR(data, stdev_list)
else:
mean_list = annual_returns_from_weekly_data(data).values
if identical_corr:
corr_matrix = get_identical_corr(data)
else:
corr_matrix = data.corr().values
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix)
return weights
def get_identical_corr(data):
## Returns a boring correlation matrix whose off diagonal elements are equal to the average correlation
instrument_count = len(data.columns)
estimated_corr = data.corr().values
avg_corr = get_avg_corr(estimated_corr)
corrmatrix = boring_corr_matrix(instrument_count, offdiag=avg_corr)
return corrmatrix
def get_identical_stdev(data):
## Returns a vector of standard deviations whose elements are the average standard deviation in the data
estimated_stdev = annual_stdev_from_weekly_data(data)
instrument_count = len(data.columns)
average_stdev = estimated_stdev.mean()
stdev_list = [average_stdev]*instrument_count
return stdev_list
def get_means_assuming_identical_SR(data, using_stdev):
## Returns a vector of means assuming equal SR, identical to the average in the data
# and using the standard deviations provided
average_SR = sharpe_ratio(data).mean()
mean_list = [stdev*average_SR for stdev in using_stdev]
return mean_list
from copy import copy
## 3) Bayesian optimisation
def optimise_with_shrinkage_parameters(data, SR_shrinkage=0.0, corr_shrinkage=0.0, stdev_shrinkage=0.0):
"""
Optimise using a mixture of data derived
:param data: pd.DataFrame
:param SR_shrinkage: float, 0= no shrinkage just use the data, 1=just use the prior (identical, average values)
:param corr_shrinkage: float
:param stdev_shrinkage: float
:return: list of floats, weights
"""
prior_stdev_list = np.array(get_identical_stdev(data))
estimated_stdev_list = annual_stdev_from_weekly_data(data).values
stdev_list = prior_stdev_list*stdev_shrinkage + estimated_stdev_list*(1-stdev_shrinkage)
prior_mean_list = np.array(get_means_assuming_identical_SR(data, stdev_list))
estimated_mean_list = annual_returns_from_weekly_data(data).values
mean_list = prior_mean_list*SR_shrinkage + estimated_mean_list*(1-SR_shrinkage)
prior_corr_matrix = get_identical_corr(data)
estimated_corr_matrix = data.corr().values
corr_matrix = prior_corr_matrix*corr_shrinkage + estimated_corr_matrix*(1-corr_shrinkage)
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix)
return weights
## 4 Optimisation through integration
def optimised_weights_given_correlation_uncertainty(corr_matrix, data_points, p_step=0.25):
dist_points = np.arange(p_step, stop=(1-p_step)+0.000001, step=p_step)
list_of_weights = []
for conf1 in dist_points:
for conf2 in dist_points:
for conf3 in dist_points:
conf_intervals = labelledCorrelations(conf1, conf2, conf3)
weights = optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points)
list_of_weights.append(weights)
array_of_weights = np.array(list_of_weights)
average_weights = np.nanmean(array_of_weights, axis=0)
return average_weights
labelledCorrelations = namedtuple("labelledCorrelations", 'ab ac bc')
def optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points):
labelled_correlations = extract_asset_pairwise_correlations_from_matrix(corr_matrix)
labelled_correlation_points = calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points)
corr_matrix_at_distribution_point = three_asset_corr_matrix(labelled_correlation_points)
weights = optimise_for_corr_matrix(corr_matrix_at_distribution_point)
return weights
def extract_asset_pairwise_correlations_from_matrix(corr_matrix):
ab = corr_matrix[0][1]
ac = corr_matrix[0][2]
bc = corr_matrix[1][2]
return labelledCorrelations(ab=ab, ac=ac, bc=bc)
def calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points):
correlation_point_list = [get_correlation_distribution_point(corr_value, data_points, confidence_interval)
for corr_value, confidence_interval in
zip(labelled_correlations, conf_intervals)]
labelled_correlation_points = labelledCorrelations(*correlation_point_list)
return labelled_correlation_points
def get_correlation_distribution_point(corr_value, data_points, conf_interval):
fisher_corr = fisher_transform(corr_value)
point_in_fisher_units = \
get_fisher_confidence_point(fisher_corr, data_points, conf_interval)
point_in_natural_units = inverse_fisher(point_in_fisher_units)
return point_in_natural_units
def fisher_transform(corr_value):
if corr_value>=1.0:
corr_value = 0.99999999999999
elif corr_value<=-1.0:
corr_value = -0.99999999999999
return 0.5*np.log((1+corr_value) / (1-corr_value)) # also arctanh
def get_fisher_confidence_point(fisher_corr, data_points, conf_interval):
if conf_interval<0.5:
confidence_in_fisher_units = fisher_confidence(data_points, conf_interval)
point_in_fisher_units = fisher_corr - confidence_in_fisher_units
elif conf_interval>0.5:
confidence_in_fisher_units = fisher_confidence(data_points, 1-conf_interval)
point_in_fisher_units = fisher_corr + confidence_in_fisher_units
else:
point_in_fisher_units = fisher_corr
return point_in_fisher_units
def fisher_confidence(data_points, conf_interval):
data_point_root =fisher_stdev(data_points)*FUDGE_FACTOR_FOR_CORR_WEIGHT_UNCERTAINTY
conf_point = get_confidence_point(conf_interval)
return data_point_root * conf_point
def fisher_stdev(data_points):
data_point_root = 1/((data_points-3)**.5)
return data_point_root
def get_confidence_point(conf_interval):
conf_point = norm.ppf(1-(conf_interval/2))
return conf_point
def inverse_fisher(fisher_corr_value):
return (np.exp(2*fisher_corr_value) - 1) / (np.exp(2*fisher_corr_value) + 1)
def three_asset_corr_matrix(labelled_correlations):
"""
:return: np.array 2 dimensions, size
"""
ab = labelled_correlations.ab
ac = labelled_correlations.ac
bc = labelled_correlations.bc
m = [[1.0, ab, ac], [ab, 1.0, bc], [ac, bc, 1.0]]
m = np.array(m)
return m
def optimise_for_corr_matrix(corr_matrix):
## arbitrary
mean_list = [.05]*3
std = [.1]*3
return optimise_with_corr_and_std( mean_list,std, corr_matrix)
def multiplier_from_relative_SR(relative_SR, avg_correlation, years_of_data):
# Return a multiplier
# 1 implies no adjustment required
ratio = mini_bootstrap_ratio_given_SR_diff(
relative_SR, avg_correlation, years_of_data
)
return ratio
def mini_bootstrap_ratio_given_SR_diff(
SR_diff,
avg_correlation,
years_of_data,
avg_SR=0.5,
std=0.15,
how_many_assets=2,
p_step=0.01,
):
"""
Do a parametric bootstrap of portfolio weights to tell you what the ratio should be between an asset which
has a higher backtested SR (by SR_diff) versus another asset(s) with average Sharpe Ratio (avg_SR)
All assets are assumed to have same standard deviation and correlation
:param SR_diff: Difference in performance in Sharpe Ratio (SR) units between one asset and the rest
:param avg_correlation: Average correlation across portfolio
:param years_of_data: How many years of data do you have (can be float for partial years)
:param avg_SR: Should be realistic for your type of trading
:param std: Standard deviation (doesn't affect results, just a scaling parameter)
:param how_many_assets: How many assets in the imaginary portfolio
:param p_step: Step size to go through in the CDF of the mean estimate
:return: float, ratio of weight of asset with different SR to 1/n weight
"""
dist_points = np.arange(
p_step,
stop=(
1 -
p_step) +
0.00000001,
step=p_step)
list_of_weights = [
weights_given_SR_diff(
SR_diff,
avg_correlation,
confidence_interval,
years_of_data,
avg_SR=avg_SR,
std=std,
how_many_assets=how_many_assets,
)
for confidence_interval in dist_points
]
array_of_weights = np.array(list_of_weights)
average_weights = np.nanmean(array_of_weights, axis=0)
ratio_of_weights = weight_ratio(average_weights)
if np.sign(ratio_of_weights - 1.0) != np.sign(SR_diff):
# This shouldn't happen, and only occurs because weight distributions
# get curtailed at zero
return 1.0
return ratio_of_weights
def weight_ratio(weights):
"""
Return the ratio of weight of first asset to other weights
:param weights:
:return: float
"""
one_over_N_weight = 1.0 / len(weights)
weight_first_asset = weights[0]
return weight_first_asset / one_over_N_weight
def weights_given_SR_diff(
SR_diff,
avg_correlation,
confidence_interval,
years_of_data,
avg_SR=0.5,
std=0.15,
how_many_assets=2,
):
"""
Return the ratio of weight to 1/N weight for an asset with unusual SR
:param SR_diff: Difference between the SR and the average SR. 0.0 indicates same as average
:param avg_correlation: Average correlation amongst assets
:param years_of_data: How long has this been going one
:param avg_SR: Average SR to use for other asset
:param confidence_interval: How confident are we about our mean estimate (i.e. cdf point)
:param how_many_assets: .... are we optimising over (I only consider 2, but let's keep it general)
:param std: Standard deviation to use
:return: Ratio of weight, where 1.0 means no difference
"""
average_mean = avg_SR * std
asset1_mean = (SR_diff + avg_SR) * std
mean_difference = asset1_mean - average_mean
# Work out what the mean is with appropriate confidence
confident_mean_difference = calculate_confident_mean_difference(
std, years_of_data, mean_difference, confidence_interval, avg_correlation)
confident_asset1_mean = confident_mean_difference + average_mean
mean_list = [confident_asset1_mean] + \
[average_mean] * (how_many_assets - 1)
weights = optimise_using_correlation(mean_list, avg_correlation, std)
return list(weights)
def optimise_using_correlation(mean_list, avg_correlation, std):
corr_matrix = boring_corr_matrix(len(mean_list), offdiag=avg_correlation)
stdev_list = [std] * len(mean_list)
sigma = sigma_from_corr_and_std(stdev_list, corr_matrix)
return optimise(sigma, mean_list)
def calculate_confident_mean_difference(
std, years_of_data, mean_difference, confidence_interval, avg_correlation
):
omega_difference = calculate_omega_difference(
std, years_of_data, avg_correlation)
confident_mean_difference = stats.norm(
mean_difference, omega_difference).ppf(confidence_interval)
return confident_mean_difference
def calculate_omega_difference(std, years_of_data, avg_correlation):
omega_one_asset = std / (years_of_data) ** 0.5
omega_variance_difference = 2 * \
(omega_one_asset ** 2) * (1 - avg_correlation)
omega_difference = omega_variance_difference ** 0.5
return omega_difference
def norm_weights(list_of_weights):
norm_weights = list(np.array(list_of_weights) / np.sum(list_of_weights))
return norm_weights
def adjust_weights_for_SR(weights, SR_list, years_of_data, avg_correlation):
"""
Adjust weights according to heuristic method
:param weights: List of float, starting weights
:param SR_list: np.array of Sharpe Ratios
:param years_of_data: float
:return: list of adjusted weights
"""
assert len(weights) == len(SR_list)
avg_SR = np.nanmean(SR_list)
relative_SR_list = SR_list - avg_SR
multipliers = [
float(multiplier_from_relative_SR(relative_SR, avg_correlation, years_of_data))
for relative_SR in relative_SR_list
]
new_weights = list(np.array(weights) * np.array(multipliers))
norm_new_weights = norm_weights(new_weights)
return norm_new_weights
corr_matrix =data.corr().values
corr_weights = optimised_weights_given_correlation_uncertainty(corr_matrix, len(data.index), p_step=0.1)
avg_correlation = get_avg_corr(corr_matrix)
WEEKS_IN_YEAR = 365.25 / 7.0
years_of_data = len(data.index)/WEEKS_IN_YEAR # weekly data
SR_list = list(sharpe_ratio(data).values)
## these are risk weights
adjusted_weights = adjust_weights_for_SR(
corr_weights, SR_list, years_of_data, avg_correlation
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment