Skip to content

Instantly share code, notes, and snippets.

@Doggie52
Last active August 22, 2021 12:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Doggie52/c5f2ab250a80d3262e8021c52e9f371b to your computer and use it in GitHub Desktop.
Save Doggie52/c5f2ab250a80d3262e8021c52e9f371b to your computer and use it in GitHub Desktop.
Financial portfolio optimizer (Sharpe/Vol/CVaR) with Bayesian shrinkage and filtering
def GenerateOptimalPortfolioWeights(returnsDf):
###########################################
# GENERATE MATRICES
# - CALCULATING RETURNS, VOLS AND DVOLS
# Populate annual returns
assetsReturn = []
for assetName in returnsDf:
assetReturn = returnsDf[assetName].add(1).product() - 1
# ** (1 / ((returnsDf.index.max() - returnsDf.index.min()) / dt.timedelta(days=365))) - 1
assetsReturn.append(assetReturn)
# Turn into matrix
assetsReturn = np.matrix(assetsReturn).T
# Populate annualised volatilities
assetsVol = []
for assetName in returnsDf:
var = np.average(np.log(returnsDf[assetName].dropna().add(1))**2)
if var < 1e-10:
var = 1e2
assetsVol.append(np.sqrt(var))
# Turn into matrix
assetsVol = np.matrix(assetsVol).T
# - CALCULATING CVARS BY BOOTSTRAPPING
# Calculates the CVaR at alpha for given returns through bootstrapping
def CalculateAssetCVar(returns, alpha, iters, forecastPeriod):
# Is our forecastPeriod greater than the number of returns?
scale = 1.0
if forecastPeriod > len(returns):
scale = forecastPeriod / len(returns)
forecastPeriod = len(returns)
samples = []
# Initiate random sequence of ints to select entries in our returns
seq = np.random.randint(0, forecastPeriod, forecastPeriod)
for i in range(iters):
buff = i*forecastPeriod % len(returns)
if buff + forecastPeriod >= len(returns):
seq = np.random.randint(0, forecastPeriod, forecastPeriod)
buff = 0
samples.append(returns[seq+buff])
samples = np.add(samples, 1.0)
# Calculate p&l
pnls = np.subtract(np.prod(samples, axis=1), 1.0)
# Calculate VaR
var = np.percentile(pnls, 1-alpha)
# Calculate CVaR as arithmetic mean of all PnLs below the VaR
return -scale * np.mean([pnl for pnl in pnls if pnl <= var])
# Populate CVaRs
assetsCVar = []
for assetName in returnsDf:
cvar = CalculateAssetCVar(
returnsDf[assetName].dropna().values, 0.99, int(1e5), 24 * 30)
if cvar < 1e-10:
cvar = 1e2
assetsCVar.append(cvar)
# Turn into matrix
assetsCVar = np.matrix(assetsCVar).T
# - CALCULATING CORRELATION
# Generate pairwise correlation matrix (assumes date ranges are identical)
# We're only using the rows where all assets have returns
assetsCorr = np.matrix(returnsDf.dropna().corr())
assetsCorr = np.nan_to_num(assetsCorr)
###########################################
# APPLY FILTERING
# - FILTER FOR MOMENTUM, APPLY AS WEIGHT BOUND
# Get return threshold & bounds
threshold = np.percentile(assetsReturn[assetsReturn > 0], 0, axis=1)
weightBounds = [(0, 1) if r >= threshold else (0, 0) for r in assetsReturn]
initialGuess = [1/len(assetsReturn >= threshold) if r >= threshold else 0 for r in assetsReturn]
# Get Sharpe threshold & bounds
# assetsSharpe = assetsReturn / assetsVol
# threshold = np.percentile(assetsSharpe[assetsSharpe > 0], 75, axis=1)
# weightBounds = [(0, 1) if s > threshold else (0, 0) for s in assetsSharpe]
# initialGuess = [1/len(assetsSharpe > threshold) if r > threshold else 0 for r in assetsSharpe]
# Get vol threshold & bounds
# threshold = np.percentile(assetsVol, 10)
# weightBounds = [(0, 1) if v < threshold else (0, 0) for v in assetsVol]
# initialGuess = [1/len(assetsVol < threshold) if v < threshold else 0 for v in assetsVol]
# Setup standard bounds for weights ($ 0 \leq w \leq 1$)
# weightBounds = [(0, 1) for a in returnsDf]
# initialGuess = [1/len(returnsDf) for a in returnsDf]
print("Threshold: ", threshold)
# return initialGuess
###########################################
# APPLY BAYESIAN SHRINKAGE
# - CALCULATE SHRUNK RETURNS, VOLS, DVOLS, CVARS AND CORRS
# Define Bayesian shrinkage function returning the posterior
# checkESign - whether or not the prior should depend on the sign of the estimate
# E.g. when dealing with correlations, we want the prior to depend on whether
# the estimated correlation is positive or negative
def ShrinkBayesian(e, p, w, checkESign=False):
if (checkESign):
p *= np.sign(e)
return (1-w)*e + w*p
# Define Bayesian priors
pReturn = np.mean(assetsReturn)
pVol = np.mean(assetsVol)
pCVar = np.mean(assetsCVar)
pCorr = 0.5
# Define Bayesian shrinkage factors
wReturn = 0.95 # 0.95
wVol = 0.0 # 0.0
wCVar = 0.0
wCorr = 0.5 # 0.5
# Adjust matrices
assetsReturn = ShrinkBayesian(assetsReturn, pReturn, wReturn)
assetsVol = ShrinkBayesian(assetsVol, pVol, wVol)
assetsCVar = ShrinkBayesian(assetsCVar, pCVar, wCVar)
assetsCorr = ShrinkBayesian(assetsCorr, pCorr, wCorr, checkESign=True)
# Let's not forget to reset the diagonals...
np.fill_diagonal(assetsCorr, 1.0)
# - CALCULATING SHRUNK COVS (VOL, DVOL AND CVAR)
# Let's go back to covariance as we need it for the optimisation
# Convert correlation matrix back to a covariance one
# We use the annualised standard deviations here
# https://blogs.sas.com/content/iml/2010/12/10/converting-between-correlation-and-covariance-matrices.html
assetsVolCov = np.diag(assetsVol.A1) * assetsCorr * np.diag(assetsVol.A1)
# Also create one for CVaR
assetsCVarCov = np.diag(assetsCVar.A1) * \
assetsCorr * np.diag(assetsCVar.A1)
###########################################
# OPTIMISATION BY MINIMISING A GOAL FUNCTION
# Define goal function
def PortfolioOptimisationGoal(weights, returns, vols, volCovs, cVarCovs):
# Calculate portfolio return
pReturn = weights * returns
# Calculate portfolio vol
pVol = np.sqrt(np.dot(weights * volCovs, weights.T))
# Calculate portfolio sharpe
pSharpe = pReturn / pVol
# Calculate portfolio CVaR
pCVar = np.sqrt(np.dot(weights * cVarCovs, weights.T))
# Define goal to *minimise*
# (WATCH OUT IF USING CVAR)
return -pSharpe
# Setup constraints
# solverConstraints = ({"type": "ineq", "fun": lambda x: 1 - np.sum(x)})
solverConstraints = ({"type": "eq", "fun": lambda x: np.sum(x) - 1})
# Optimise to find the minimum
optimisationResult = optimize.minimize(fun = PortfolioOptimisationGoal, x0 = initialGuess, bounds = weightBounds,
constraints = solverConstraints, args = (
assetsReturn, assetsVol, assetsVolCov, assetsCVarCov),
options = {})
if optimisationResult.status is 9:
return initialGuess
else:
return optimisationResult.x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment