Skip to content

Instantly share code, notes, and snippets.

@jaymon0703
Last active Jul 30, 2021
Embed
What would you like to do?
Hierarchical Risk Parity implementation in Python
# Python 3 code
import matplotlib.pyplot as mpl
import scipy.cluster.hierarchy as sch,random
import numpy as np
import pandas as pd
# Now, take corMat and covMat generated from R, input into Marcos' Python, and check output is the same...
col_list = ["IGV.Close", "TLT.Close", "IAU.Close", "IYR.Close"]
corr = pd.read_csv("corMat_23072021.csv", usecols=col_list)
cov = pd.read_csv("covMat_23072021.csv", usecols = col_list)
#------------------------------------------------------------------------------
def getIVP(cov,**kargs):
# Compute the inverse-variance portfolio
ivp=1./np.diag(cov)
ivp/=ivp.sum()
return ivp
#------------------------------------------------------------------------------
def getClusterVar(cov,cItems):
# Compute variance per cluster
cov_=cov.loc[cItems,cItems] # matrix slice
w_=getIVP(cov_).reshape(-1,1)
cVar=np.dot(np.dot(w_.T,cov_),w_)[0,0]
return cVar
#------------------------------------------------------------------------------
def plotCorrMatrix(path,corr,labels=None):
# Heatmap of the correlation matrix
if labels is None:labels=[]
mpl.pcolor(corr)
mpl.colorbar()
mpl.yticks(np.arange(.5,corr.shape[0]+.5),labels)
mpl.xticks(np.arange(.5,corr.shape[0]+.5),labels)
mpl.savefig(path)
mpl.clf()
mpl.close() # reset pylab
return
plotCorrMatrix('HRP3_corr0.png',corr,labels=corr.columns)
#3) cluster
def correlDist(corr):
# A distance matrix based on correlation, where 0<=d[i,j]<=1
# This is a proper distance metric
dist=((1-corr)/2.)**.5 # distance matrix
return dist
#------------------------------------------------------------------------------
dist=correlDist(corr)
link=sch.linkage(dist,'single')
def getQuasiDiag(link):
# Sort clustered items by distance
link=link.astype(int)
sortIx=pd.Series([link[-1,0],link[-1,1]])
numItems=link[-1,3] # number of original items
while sortIx.max()>=numItems:
sortIx.index=range(0,sortIx.shape[0]*2,2) # make space
df0=sortIx[sortIx>=numItems] # find clusters
i=df0.index;j=df0.values-numItems
sortIx[i]=link[j,0] # item 1
df0=pd.Series(link[j,1],index=i+1)
sortIx=sortIx.append(df0) # item 2
sortIx=sortIx.sort_index() # re-sort
sortIx.index=range(sortIx.shape[0]) # re-index
return sortIx.tolist()
#------------------------------------------------------------------------------
sortIx=getQuasiDiag(link)
sortIx=corr.index[sortIx].tolist() # recover labels
df0=corr.iloc[sortIx,sortIx] # reorder
plotCorrMatrix('HRP3_corr1.png',df0,labels=df0.columns)
#4) Capital allocation
cov_colnames = cov.columns.values
cov.rename(columns={'IGV.Close': 0, 'TLT.Close': 1, 'IAU.Close': 2, 'IYR.Close': 3}, inplace = True)
def getRecBipart(cov,sortIx):
# Compute HRP alloc
w=pd.Series(1,index=sortIx)
cItems=[sortIx] # initialize all items in one cluster
print(cItems)
while len(cItems)>0:
cItems=[i[j:k] for i in cItems for j,k in ((0,len(i)//2), (len(i)//2,len(i))) if len(i)>1] # bi-section
for i in range(0,len(cItems),2): # parse in pairs
cItems0=cItems[i] # cluster 1
cItems1=cItems[i+1] # cluster 2
cVar0=getClusterVar(cov,cItems0)
cVar1=getClusterVar(cov,cItems1)
alpha=1-cVar0/(cVar0+cVar1)
w[cItems0]*=alpha # weight 1
w[cItems1]*=1-alpha # weight 2
return w
hrp=getRecBipart(cov,sortIx)
print(hrp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment