Skip to content

Instantly share code, notes, and snippets.

@twiecki
Created October 4, 2016 09:38
Show Gist options
  • Save twiecki/9f6d3d2bdfba8523a6b7055049c63e40 to your computer and use it in GitHub Desktop.
Save twiecki/9f6d3d2bdfba8523a6b7055049c63e40 to your computer and use it in GitHub Desktop.
import scipy.cluster.hierarchy as sch
import random
import numpy as np
import pandas as pd
def getIVP(cov, **kargs):
# Compute the inverse-variance portfolio
ivp = 1. / np.diag(cov)
ivp /= ivp.sum()
return ivp
def getClusterVar(cov,cItems):
# Compute variance per cluster
cov_=cov.loc[cItems,cItems] # matrix slice
w_=getIVP(cov_).reshape(-1,1)
cVar=np.dot(np.dot(w_.T,cov_),w_)[0,0]
return cVar
def getQuasiDiag(link):
# Sort clustered items by distance
link = link.astype(int)
sortIx = pd.Series([link[-1, 0], link[-1, 1]])
numItems = link[-1, 3] # number of original items
while sortIx.max() >= numItems:
sortIx.index = range(0, sortIx.shape[0] * 2, 2) # make space
df0 = sortIx[sortIx >= numItems] # find clusters
i = df0.index
j = df0.values - numItems
sortIx[i] = link[j, 0] # item 1
df0 = pd.Series(link[j, 1], index=i + 1)
sortIx = sortIx.append(df0) # item 2
sortIx = sortIx.sort_index() # re-sort
sortIx.index = range(sortIx.shape[0]) # re-index
return sortIx.tolist()
def getRecBipart(cov, sortIx):
# Compute HRP alloc
w = pd.Series(1, index=sortIx)
cItems = [sortIx] # initialize all items in one cluster
while len(cItems) > 0:
cItems = [i[j:k] for i in cItems for j, k in ((0, len(i) / 2),
(len(i) / 2, len(i))) if len(i) > 1] # bi-section
for i in xrange(0, len(cItems), 2): # parse in pairs
cItems0 = cItems[i] # cluster 1
cItems1 = cItems[i + 1] # cluster 2
cVar0 = getClusterVar(cov, cItems0)
cVar1 = getClusterVar(cov, cItems1)
alpha = 1 - cVar0 / (cVar0 + cVar1)
w[cItems0] *= alpha # weight 1
w[cItems1] *= 1 - alpha # weight 2
return w
def correlDist(corr):
# A distance matrix based on correlation, where 0<=d[i,j]<=1
# This is a proper distance metric
dist = ((1 - corr) / 2.)**.5 # distance matrix
return dist
def generateData(nObs, sLength, size0, size1, mu0, sigma0, sigma1F):
# Time series of correlated variables
# 1) generate random uncorrelated data
x = np.random.normal(mu0, sigma0, size=(nObs, size0))
# each row is a variable
# 2) create correlation between the variables
cols = [random.randint(0, size0 - 1) for i in xrange(size1)]
y = x[:, cols] + np.random.normal(0, sigma0 * sigma1F, size=(nObs, len(cols)))
x = np.append(x, y, axis=1)
# 3) add common random shock
point = np.random.randint(sLength, nObs - 1, size=2)
x[np.ix_(point, [cols[0], size0])] = np.array([[-.5, -.5], [2, 2]])
# 4) add specific random shock
point = np.random.randint(sLength, nObs - 1, size=2)
x[point, cols[-1]] = np.array([-.5, 2])
return x, cols
def getHRP(cov, corr):
# Construct a hierarchical portfolio
corr, cov = pd.DataFrame(corr), pd.DataFrame(cov)
dist = correlDist(corr)
link = sch.linkage(dist, 'single')
sortIx = getQuasiDiag(link)
sortIx = corr.index[sortIx].tolist()
# recover labels
hrp = getRecBipart(cov, sortIx)
return hrp.sort_index()
def getCLA(cov, **kargs):
# Compute CLA's minimum variance portfolio
mean = np.arange(cov.shape[0]).reshape(-1, 1)
# Not used by C portf
lB = np.zeros(mean.shape)
uB = np.ones(mean.shape)
cla = CLA(mean, cov, lB, uB)
cla.solve()
return cla.w[-1].flatten()
def hrpMC(numIters=10000, nObs=520, size0=5, size1=5, mu0=0, sigma0=1e-2,
sigma1F=.25, sLength=260, rebal=22):
# Monte Carlo experiment on HRP
methods = {'getHRP': getHRP, 'getIVP': getIVP, 'getCLA': getCLA}
stats = {k: pd.Series() for k in methods.keys()}
pointers = range(sLength, nObs, rebal)
for numIter in xrange(int(numIters)):
# print numIter
# 1) Prepare data for one experiment
x, cols = generateData(nObs, sLength, size0,
size1, mu0, sigma0, sigma1F)
r = pd.DataFrame(columns=[methods.keys()],
index=range(sLength, nObs))#{i.__name__: pd.Series() for i in methods}
#print r
# 2) Compute portfolios in-sample
for pointer in pointers:
x_ = x[pointer - sLength:pointer]
cov_ = np.cov(x_, rowvar=0)
corr_ = np.corrcoef(x_, rowvar=0)
# 3) Compute performance out-of-sample
x_ = x[pointer:pointer + rebal]
for name, func in methods.iteritems():
w_ = func(cov=cov_, corr=corr_)
# callback
#r_ = pd.Series(np.dot(x_, w_))
#print r[name].append(r_)
#print pointer
r.loc[pointer:pointer + rebal - 1, name] = np.dot(x_, w_)
# 4) Evaluate and store results
for name, func in methods.iteritems():
r_ = r[name].reset_index(drop=True)
p_ = (1 + r_).cumprod()
stats[name].loc[numIter] = p_.iloc[-1] - 1 # terminal return
# 5) Report results
stats = pd.DataFrame.from_dict(stats, orient='columns')
# stats.to_csv('stats.csv')
df0, df1 = stats.std(), stats.var()
print pd.concat([df0, df1, df1 / df1['getHRP'] - 1], axis=1)
return stats
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment