Skip to content

Instantly share code, notes, and snippets.

@thelittlekid
Created April 19, 2020 22:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thelittlekid/89630241f5b90a838a7b583a5836d350 to your computer and use it in GitHub Desktop.
Save thelittlekid/89630241f5b90a838a7b583a5836d350 to your computer and use it in GitHub Desktop.
Python version of the original R function at: https://github.com/jmhewitt/telefit/blob/master/R/cca.predict.R
import numpy as np
from scipy.linalg import eigh
def cca_predict(X, Y, X_new, kx, ky, whitening=False):
"""
Canonical correlation analysis and prediction
Python version of the original R function at: https://github.com/jmhewitt/telefit/blob/master/R/cca.predict.R
:param X: training samples of the predictor with shape M1 x N (M1 variables, N observations)
:param Y: training samples of the predictand with shape M2 x N (M2 variables, N observations)
:param X_new: test samples of the predictor
:param kx: budget (target dimension) for dimension reduction of X
:param ky: budget (target dimension) for dimension reduction of Y
:param whitening: whether or not to divide the data by its standard deviation (not suitable for those with std=0)
:return: Y_hat -- predicted values of the predictands in test samples
"""
X_mean, X_std = X.mean(axis=1, keepdims=True), X.std(axis=1, ddof=1, keepdims=True)
Y_mean, Y_std = Y.mean(axis=1, keepdims=True), Y.std(axis=1, ddof=1, keepdims=True)
if whitening:
nXp = ((X - X_mean) / X_std).T
nXp_new = ((X_new - X_mean) / X_std).T
nYq = ((Y - Y_mean) / Y_std).T
else:
nXp = (X - X_mean).T
nXp_new = (X_new - X_mean).T
nYq = (Y - Y_mean).T
n, p, q = nXp.shape[0], nXp.shape[1], nYq.shape[1]
def cor_eigen(X_):
if X_.shape[0] < X_.shape[1]:
X_ = X_ / np.sqrt(n - 1)
w, v = eigh(X_.dot(X_.T))
w, v = w[::-1], v[:, ::-1] # scipy.linalg.eigh return eigenvalues in ascending order
w[w < 0] = 0
return w, X_.T.dot(v) / np.sqrt(w)
else:
w, v = eigh(X_.T.dot(X_) / (n - 1))
return w[::-1], v[:, ::-1]
pRp_w, pRp_v = cor_eigen(nXp)
qRq_w, qRq_v = cor_eigen(nYq)
pLp = np.diag(pRp_w)
pEp = pRp_v
qMq = np.diag(qRq_w)
qFq = qRq_v
nUpp = nXp.dot(pEp[:, :kx])
nUpp_mean, nUpp_std = nUpp.mean(axis=0, keepdims=True), nUpp.std(axis=0, ddof=1, keepdims=True)
nUpp = (nUpp - nUpp_mean) / nUpp_std
nVqq = nYq.dot(qFq[:, :ky])
nVqq_mean, nVqq_std = nVqq.mean(axis=0, keepdims=True), nVqq.std(axis=0, ddof=1, keepdims=True)
nVqq = (nVqq - nVqq_mean) / nVqq_std
nUpp_new = (nXp_new.dot(pEp)[:, :kx] - nUpp_mean) / nUpp_std
pRp = nUpp.T.dot(nUpp) / (n - 1)
pRq = nUpp.T.dot(nVqq) / (n - 1)
qRq = nVqq.T.dot(nVqq) / (n - 1)
# Cook eq. 19
B_w, B = eigh(linalg.inv(qRq).dot(pRq.T).dot(linalg.inv(pRp)).dot(pRq))
B_w, B = B_w[::-1], B[:, ::-1]
B_w[B_w < 0] = 0
qLambdaq = np.diag(B_w)
# Glahn (1968) eq.13
A = linalg.inv(pRp).dot(pRq).dot(B).dot(np.diag(1 / np.sqrt(B_w)))
V_hat = nUpp_new.dot(A).dot(qLambdaq).dot(linalg.inv(B))
# Unscale and backtransform V_hat
V_hat = V_hat * nVqq_std + nVqq_mean
Y_hat = V_hat.dot(qFq[:, :ky].T).T
if whitening:
Y_hat = Y_hat * Y_std + Y_mean
else:
Y_hat = Y_hat + Y_mean
return Y_hat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment