Skip to content

Instantly share code, notes, and snippets.

@kingjr
Created July 30, 2021 08:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kingjr/bed8d15f76857c8582d0813e72f85ade to your computer and use it in GitHub Desktop.
Save kingjr/bed8d15f76857c8582d0813e72f85ade to your computer and use it in GitHub Desktop.
banded_ridge.py
import numpy as np
from numpy.linalg import multi_dot
from scipy.linalg import svd
from sklearn.linear_model import RidgeCV
from sklearn.model_selection import KFold
def get_from_indices(X, indices):
out = np.zeros_like(X[0])
for idx in np.unique(indices):
sel = np.where(indices == idx)
out[sel] = X[idx][sel]
return out
class BandedRidgeCV(RidgeCV):
def __init__(
self,
alphas,
ratios,
groups,
cv=4,
fit_intercept=True,
alpha_per_target=True,
):
self.alphas = np.asarray(alphas)
self.ratios = np.asarray(ratios)
self.groups = np.asarray(groups)
if not np.array_equal(np.unique(groups), [0, 1]):
raise NotImplementedError("groups must be composed of 0s and 1s")
if fit_intercept:
raise NotImplementedError # TODO, use StandardScaler pipeline itmt
self.alpha_per_target = alpha_per_target
self.cv = cv
def _prep_ratio(self, ratio):
"""Prepare banded SVD"""
angle = np.arctan(ratio)
lambda1 = np.cos(angle) / np.cos(np.pi / 4)
lambda2 = np.sin(angle) / np.cos(np.pi / 4)
bands = np.r_[
np.ones(sum(self.groups == 0)) * lambda1,
np.ones(sum(self.groups == 1)) * lambda2,
]
Cinv = np.diag(bands ** -1)
return Cinv, bands
def _find_best_alpha(self, X, Y, train, test, ratio):
Cinv, bands = self._prep_ratio(ratio)
U, S, VT = svd(X / bands[None, :], full_matrices=False)
V = VT.T
UTY = U[train].T @ Y[train]
# Loop across alphas
errors = np.ones((len(self.alphas), Y.shape[1])) * np.inf
Ws = np.zeros((len(self.alphas), Y.shape[1], X.shape[1]))
for idx, alpha in enumerate(self.alphas):
# I didn't manage to apply the LOO trick to banded ridge
#
# D = 1. / (S2 + alpha) - alpha ** -1
# K = U @ np.diag(D) @ UTY + alpha ** -1 * Y
# G_diag = (D * U ** 2).sum(axis=-1) + alpha ** -1
# error = K / G_diag[:, None]
# W = K.T @ X
# Solve
W = multi_dot([Cinv, V, np.diag(S / (S ** 2 + alpha)), UTY])
err = ((X[test] @ W - Y[test]) ** 2).mean(0)
errors[idx] = err
Ws[idx] = W.T
best = np.argmin(errors, axis=0)
if self.alpha_per_target:
W = get_from_indices(Ws, best)
errors = get_from_indices(errors, best)
alphas = self.alphas[best]
else:
counts = np.bincount(best)
best = np.argmax(counts)
W = Ws[best]
errors = errors[best]
alphas = self.alphas[best]
return W, errors, alphas
def _find_best_ratio(self, X, Y, train, test):
W = np.zeros((len(self.ratios), Y.shape[1], X.shape[1]))
alphas = np.zeros((len(self.ratios), Y.shape[1]))
errors = np.ones((len(self.ratios), Y.shape[1])) * np.inf
for idx, ratio in enumerate(self.ratios):
# Compute error with alpha hyperparam optim
ratio_W, ratio_err, ratio_alpha = self._find_best_alpha(
X, Y, train, test, ratio
)
alphas[idx] = ratio_alpha
errors[idx] = ratio_err
W[idx] = ratio_W
best = np.argmin(errors, axis=0)
if self.alpha_per_target:
W = get_from_indices(W, best)
errors = get_from_indices(errors, best)
alphas = get_from_indices(alphas, best)
ratios = self.ratios[best]
else:
counts = np.bincount(best)
best = np.argmax(counts)
W = W[best]
errors = errors[best]
alphas = alphas[best]
ratios = self.ratios[best]
return W, errors, alphas, ratios
def fit(self, X, Y):
if Y.ndim == 1:
Y = Y[:, None]
cv = KFold(self.cv) if isinstance(self.cv, int) else self.cv
W = np.zeros((cv.n_splits, Y.shape[1], X.shape[1]))
errors = np.zeros((cv.n_splits, Y.shape[1]))
alphas = np.zeros((cv.n_splits, Y.shape[1]))
ratios = np.zeros((cv.n_splits, Y.shape[1]))
for split, (train, test) in enumerate(cv.split(X, Y)):
(
W[split],
errors[split],
alphas[split],
ratios[split],
) = self._find_best_ratio(X, Y, train, test)
self.errors_ = np.mean(errors, axis=0)
self.alphas_ = 10 ** np.mean(np.log10(alphas), axis=0)
self.ratios_ = 10 ** np.mean(np.log10(ratios), axis=0)
self.coef_ = W.mean(0) # TODO retrain?
self.intercept_ = 0.0
return self
def predict(self, X):
return X @ self.coef_.T
if __name__ == "__main__":
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
def make_data():
np.random.seed(0)
dy = 3 # e.g. 3 voxels
dx1 = 5 # e.g. 5 dims of wordembedding
dx2 = 5 # 5 dims of gpt2 emb
dx = dx1 + dx2
d = dx + dy
cov = np.random.randn(d, d)
cov = cov.T @ cov
X = np.random.multivariate_normal(mean=np.zeros(d), cov=cov, size=500)
X, y = X[:, :dx], X[:, dx:]
groups = [0] * dx1 + [1] * dx2
assert len(groups) == X.shape[1]
return X, y, groups
X, y, groups = make_data()
# ridge regression
alphas = np.logspace(-2, 5, 8)
ridge = RidgeCV(alphas, fit_intercept=False)
# banded ridge regression
# regularization ratio between group1 and group2
ratios = 10 ** np.linspace(-1.5, 1.5, 5)
banded_ridge = BandedRidgeCV(alphas, ratios, groups, fit_intercept=False)
# compute score
models = dict(ridge=ridge, banded_ridge=banded_ridge)
for name, model in models.items():
pipe = make_pipeline(StandardScaler(), model)
pipe.fit(X, y)
print(name, pipe.score(X, y))
@kingjr
Copy link
Author

kingjr commented Jul 30, 2021

pipe.fit(X, y)
# knockout score
K = np.ones_like(X)
K[:, group==0] = 0
pipe.score(X*K, y)

K = np.ones_like(X)
K[:, group==1] = 0
pipe.score(X*K, y)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment