Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#import matplotlib
#matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
from scipy.optimize import minimize
import numpy as np
### Following is the optimisation code:
### First the main function.
def optimise_with_sigma(sigma, mean_list):
"""
Given mean and covariance matrix, find portfolio weights that maximise SR
We assume we can have as much leverage as we like to hit a given risk target, hence we can
constrain weights to sum to 1 i.e. no cash option
:param sigma: np.array NxN, covariance matrix
:param mean_list: list of floats N in length, means
:return: list of weights
"""
mus = np.array(mean_list, ndmin=2).transpose()
number_assets = sigma.shape[1]
# Starting weights, equal weighting
start_weights = [1.0 / number_assets] * number_assets
# Set up constraints - positive weights, adding to 1.0
bounds = [(0.0, 1.0)] * number_assets
cdict = [{'type': 'eq', 'fun': addem}]
ans = minimize(
neg_sharpe_ratio,
start_weights, (sigma, mus),
method='SLSQP',
bounds=bounds,
constraints=cdict,
tol=0.00001)
weights = ans['x']
return weights
## This is the function we optimise over
def neg_sharpe_ratio(weights, sigma, mus):
"""
Returns minus return /risk for a portfolio
:param weights: list of floats N in length
:param sigma: np.array NxN, covariance matrix
:param mus: np.array Nx1 in length, means
:return: float
"""
weights = np.matrix(weights)
estreturn = (weights * mus)[0, 0]
std_dev = (variance(weights, sigma)**.5)
# Returns minus the portfolio Sharpe Ratio (as we're minimising)
return -estreturn / std_dev
## Portfolio standard deviation and variance given weights and covariance matrix sigma
def portfolio_stdev(weights, sigma):
std_dev = (variance(weights, sigma)**.5)
return std_dev
def variance(weights, sigma):
# returns the variance (NOT standard deviation) given weights and sigma
return (weights * sigma * weights.transpose())[0, 0]
def addem(weights):
# Used for constraints, weights must sum to 1
return 1.0-sum(weights)
# Useful for working in standard deviation and correlation space
# rather than covariance space
def optimise_with_corr_and_std( mean_list,stdev_list, corrmatrix):
"""
Given mean, standard deviation and correlation matrix, find portfolio weights that maximise SR
:param mean_list: list of N floats
:param stdev_list: list of N floats
:param corrmatrix: NxN np.array
:return: list of floats
"""
sigma = sigma_from_corr_and_std(stdev_list, corrmatrix)
weights = optimise_with_sigma(sigma, mean_list)
return weights
def sigma_from_corr_and_std(stdev_list, corrmatrix):
"""
Translates standard deviation and correlation matrix into covariance matrix
:param stdev_list: list of float
:param corrmatrix: NxN np.array
:return: covariance matrix, NxN np.array
"""
stdev = np.array(stdev_list, ndmin=2).transpose()
sigma = stdev * corrmatrix * stdev
return sigma
### Do all the examples in the slides
optimise_with_corr_and_std([0.04,0.04,0.04], [.08,.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]]))
optimise_with_corr_and_std( [0.04,0.04,0.04], [.08,.08,0.08],np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]]))
optimise_with_corr_and_std( [0.04,0.04,0.04],[.08,.08,0.08], np.array([[1,0.7,0.0],[0.7,1,0.0],[0.0,0.0,1]]))
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.12], np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]]))
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.085], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]]))
optimise_with_corr_and_std([.04,.04,0.045], [0.08,0.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]]))
""" This code is used by me to get the data from my trading system. You can ignore it
from systems.provided.futures_chapter15.basesystem import futures_system
system = futures_system()
data=dict([(code,system.rawdata.get_percentage_returns(code)) for code in ["SP500", "US10", "US5"]])
import pandas as pd
data = pd.concat(data, axis=1)
data = data.resample("7D").sum()
data = data[pd.datetime(1998,1,1):]
"""
import pandas as pd
data = pd.read_csv("returns.csv")
WEEKS_IN_YEAR = 365.25/7
## Some functions to analyse data and return various statistics
## Assumes weekly returns
def annual_returns_from_weekly_data(data):
return data.mean()*WEEKS_IN_YEAR
def annual_stdev_from_weekly_data(data):
return data.std()*(WEEKS_IN_YEAR**.5)
def sharpe_ratio(data):
SR = annual_returns_from_weekly_data(data)/annual_stdev_from_weekly_data(data)
return SR
def optimise_with_data(data):
"""
Given some data, find the optimal SR portfolio
:param data: np.DataFrame containing N columns
:return: list of N floats
"""
mean_list = annual_returns_from_weekly_data(data).values
stdev_list = annual_stdev_from_weekly_data(data).values
corrmatrix = data.corr().values
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix)
return weights
## Bootstrapping code
import random
## Do optimisation with some random data
def optimisation_with_random_bootstrap(data):
bootstrapped_data = get_bootstrap_series(data)
weights = optimise_with_data(bootstrapped_data)
return weights
## Get some random data
def get_bootstrap_series(data):
length_of_series = len(data.index)
random_indices = [int(random.uniform(0,length_of_series)) for _unused in range(length_of_series)]
bootstrap_data = data.iloc[random_indices]
return bootstrap_data
def bootstrap_optimisation_distributions(data, monte_count=1000):
"""
Bootstrap over data, returning distribution of portfolio weights
:param data: np.DataFrame containing N columns
:param monte_count: int, how many bootstraps to do
:return: list of N floats
"""
dist_of_weights = []
for i in range(monte_count):
single_bootstrap_weights = optimisation_with_random_bootstrap(data)
dist_of_weights.append(single_bootstrap_weights)
dist_of_weights = pd.DataFrame(dist_of_weights)
dist_of_weights.columns = data.columns
return dist_of_weights
## Generate data for the slides
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000)
plt.hist(weight_distr.SP500, bins=100)
plt.hist(weight_distr.US10, bins=100)
plt.hist(weight_distr.US5, bins=100)
## Now for the plots showing the sensitivity of weights to a given factor, eg Sharpe Ratio
## First we need a function that allows us to estimate correlations for a given 'code' eg SP500/US10
def correlation_for_code(data):
"""
Calculate correlation matrix and return as dict
:param data: pd.DataFrame with column names eg xyz...
:return: dict with keys eg x/y, y/z, x/z, y/x, z/y, z/x ... containing correlations
"""
corr = data.corr()
instruments = data.columns
results_dict = {}
for instr1 in instruments:
for instr2 in instruments:
code = join_corr_code(instr1, instr2)
results_dict[code] = corr.loc[instr1, instr2]
return results_dict
def split_corr_code(code):
split_code = code.split("/")
instr1 = split_code[0]
instr2 = split_code[1]
return instr1, instr2
def join_corr_code(instr1, instr2):
return '%s/%s' % (instr1, instr2)
## Now we have functions that allow us to find the factor distribution for any factor
func_dict = dict(sharpe=sharpe_ratio, stdev = annual_stdev_from_weekly_data,
corr = correlation_for_code)
factor_list = list(func_dict.keys())
def plot_changeling(code, factor,data):
"""
Top level function which gets the data, plots and analyses it
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:return: None
"""
assert factor in factor_list
# Get the data
factor_distribution, weights_data = changeling_graph_data(code, factor, data)
# Now make two plots
fig, (ax1, ax2) = plt.subplots(2)
fig.suptitle("Distribution of %s for %s, average %.2f" % (factor, code, np.mean(factor_distribution)))
ax1.hist(factor_distribution, bins=50)
weights_data.plot(ax=ax2)
# Now analyse
analyse_changeling_results(code, factor, factor_distribution, data)
def changeling_graph_data(code, factor, data):
"""
Get the data required for the graphing; a factor distribution and the matched weights
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:return: tuple: factor_distribution and weights
"""
factor_distribution = get_factor_distribution(code, factor, data)
weights_data = get_weights_data(code, factor, data, factor_distribution)
return factor_distribution, weights_data
def get_factor_distribution(code, factor, data, monte_length=10000):
"""
Get the bootstrapped distribution of a given factor value
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param monte_length: int, how many monte carlo runs to do
:return: list of float
"""
factor_func = func_dict[factor]
factor_distr = []
for _not_used in range(monte_length):
bootstrap_data = get_bootstrap_series(data)
factor_estimate = factor_func(bootstrap_data)
factor_estimate_for_code = factor_estimate[code]
factor_distr.append(factor_estimate_for_code)
# note we drop the outlying values which are probably extreme
factor_distr.sort()
drop_values = int(monte_length*.01)
factor_distr = factor_distr[drop_values:-drop_values]
return factor_distr
def get_weights_data(code, factor, data, factor_distribution, points_to_use=100):
"""
Optimise weights, using the statistics in data, but replacing the estimate of factor for code
by various points on the factor distribution.
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param factor_distribution: list of float
:param points_to_use: int, how many points on the factor distribution
:return: pd.DataFrame of weights, same columns as data, length is points_to_use
"""
factor_range = [np.min(factor_distribution), np.max(factor_distribution)]
factor_step = (factor_range[1] - factor_range[0])/points_to_use
factor_values_to_test = np.arange(start = factor_range[0], stop = factor_range[1], step = factor_step)
weight_results = []
for factor_value_to_use in factor_values_to_test:
weights = optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use)
weight_results.append(weights)
weight_results = pd.DataFrame(weight_results)
weight_results.columns = data.columns
return weight_results
def optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use):
"""
Optimise weights, using the statistics in data, but replacing the estimate of factor for code
by factor_value_to_use
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param factor_value_to_use: float
:return: list of floats, weights
"""
mean_list, stdev_list, corrmatrix = replace_factor_value(data, code, factor, factor_value_to_use)
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix)
return weights
def replace_factor_value(data, code, factor, factor_value_to_use):
"""
Estimate statistics from data, but replacing the estimate of factor for code
by factor_value_to_use
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param data: pd.DataFrame
:param factor_value_to_use: float
:return: 3 tuple mean_list, stdev_list, corrmatrix
"""
# First standard deviation
stdev_list = annual_stdev_from_weekly_data(data)
if factor=="stdev":
stdev_list[code] = factor_value_to_use
stdev_list = stdev_list.values
# Next Sharpe Ratio
sharpe_list = sharpe_ratio(data)
if factor=="sharpe":
sharpe_list[code] = factor_value_to_use
sharpe_list = sharpe_list.values
# We don't derive mean directly instead we derive it from Sharpe Ratios
mean_list = stdev_list * sharpe_list
# Now correlations. Needs a fancy function
corrmatrix = data.corr()
if factor=="corr":
corrmatrix = replace_corr_value(corrmatrix, code, factor_value_to_use)
corrmatrix = corrmatrix.values
return mean_list, stdev_list, corrmatrix
def replace_corr_value(corrmatrix, code, factor_value_to_use):
"""
Given a correlation matrix, replace one value (actually two because of symettry)
:param corrmatrix: NxN matrix with labels
:param code: str 'A/B' where A and N are labels of correlation matrix
:param factor_value_to_use: float, to replace A/B and B/A correlation with
:return: new NxN correlation matrix
"""
instr1, instr2 = split_corr_code(code)
corrmatrix.loc[instr1, instr2] = factor_value_to_use
corrmatrix.loc[instr2, instr1] = factor_value_to_use
return corrmatrix
def analyse_changeling_results(code, factor, factor_distribution, data):
"""
Prints some analysis of factor_distribution
:param code: str. Must be a column name in data or 'A/B' if correlations are used
:param factor: str. Must be in factor_list
:param factor_distribution: list of float
:return: None, just prints
"""
factor5 = np.percentile(factor_distribution, 5)
factor95 = np.percentile(factor_distribution, 95)
print("There is a 90%% chance that %s for %s was between %.2f and %.2f" % (factor, code, factor5, factor95))
weights5 = optimise_with_replaced_factor_value(data, code, factor, factor5)
weights95 = optimise_with_replaced_factor_value(data, code, factor, factor95)
## Get the weights we need for code
## For correlations return the weights of the first item in the pair 'A/B'
if factor=="corr":
code = split_corr_code(code)[0]
# Weights are list not dict, so need to know position of right weight
instruments = list(data.columns)
code_index = instruments.index(code)
weight5_code = weights5[code_index]
weight95_code = weights95[code_index]
print("Giving weights for %s between %.3f and %.3f" % (code, weight5_code, weight95_code))
# Now do the plots in the slides
plot_changeling("SP500", "sharpe", data)
plot_changeling("US10", "sharpe", data)
plot_changeling("US5", "sharpe", data)
plot_changeling("SP500", "stdev", data)
plot_changeling("US10", "stdev", data)
plot_changeling("US5", "stdev", data)
plot_changeling("SP500/US10", "corr", data)
plot_changeling("SP500/US5", "corr", data)
plot_changeling("US5/US10", "corr", data)
## Let's do optimisation better!
## 1) Bootstrapped weights
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000)
weight_distr.mean()
## 2) Set some values to be identical
def optimise_with_identical_values(data, identical_SR=False, identical_stdev=False, identical_corr=False):
if identical_stdev:
stdev_list = get_identical_stdev(data)
else:
stdev_list = annual_stdev_from_weekly_data(data).values
if identical_SR:
## We want means, but derive them from SR based on the standard deviations we have
mean_list = get_means_assuming_identical_SR(data, stdev_list)
else:
mean_list = annual_returns_from_weekly_data(data).values
if identical_corr:
corr_matrix = get_identical_corr(data)
else:
corr_matrix = data.corr().values
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix)
return weights
def get_identical_corr(data):
## Returns a boring correlation matrix whose off diagonal elements are equal to the average correlation
instrument_count = len(data.columns)
estimated_corr = data.corr().values
avg_corr = get_avg_corr(estimated_corr)
corrmatrix = boring_corr_matrix(instrument_count, offdiag=avg_corr)
return corrmatrix
def get_identical_stdev(data):
## Returns a vector of standard deviations whose elements are the average standard deviation in the data
estimated_stdev = annual_stdev_from_weekly_data(data)
instrument_count = len(data.columns)
average_stdev = estimated_stdev.mean()
stdev_list = [average_stdev]*instrument_count
return stdev_list
def get_means_assuming_identical_SR(data, using_stdev):
## Returns a vector of means assuming equal SR, identical to the average in the data
# and using the standard deviations provided
average_SR = sharpe_ratio(data).mean()
mean_list = [stdev*average_SR for stdev in using_stdev]
return mean_list
from copy import copy
def boring_corr_matrix(size, offdiag=0.99, diag=1.0):
"""
Create a boring correlation matrix
:param size: dimensions
:param offdiag: value to put in off diagonal
:param diag: value to put in diagonal
:return: np.array 2 dimensions, size
"""
size_index = range(size)
def _od(i, j, offdiag, diag):
if i == j:
return diag
else:
return offdiag
m = [[_od(i, j, offdiag, diag) for i in size_index] for j in size_index]
m = np.array(m)
return m
def get_avg_corr(sigma):
## Get the average correlation value in a correlation matrix
new_sigma = copy(sigma)
np.fill_diagonal(new_sigma, np.nan)
if np.all(np.isnan(new_sigma)):
return np.nan
avg_corr = np.nanmean(new_sigma)
return avg_corr
## 3) Bayesian optimisation
def optimise_with_shrinkage_parameters(data, SR_shrinkage=0.0, corr_shrinkage=0.0, stdev_shrinkage=0.0):
"""
Optimise using a mixture of data derived
:param data: pd.DataFrame
:param SR_shrinkage: float, 0= no shrinkage just use the data, 1=just use the prior (identical, average values)
:param corr_shrinkage: float
:param stdev_shrinkage: float
:return: list of floats, weights
"""
prior_stdev_list = np.array(get_identical_stdev(data))
estimated_stdev_list = annual_stdev_from_weekly_data(data).values
stdev_list = prior_stdev_list*stdev_shrinkage + estimated_stdev_list*(1-stdev_shrinkage)
prior_mean_list = np.array(get_means_assuming_identical_SR(data, stdev_list))
estimated_mean_list = annual_returns_from_weekly_data(data).values
mean_list = prior_mean_list*SR_shrinkage + estimated_mean_list*(1-SR_shrinkage)
prior_corr_matrix = get_identical_corr(data)
estimated_corr_matrix = data.corr().values
corr_matrix = prior_corr_matrix*corr_shrinkage + estimated_corr_matrix*(1-corr_shrinkage)
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix)
return weights
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.