Instantly share code, notes, and snippets.

# ahwillia/cv.py

Last active November 29, 2023 22:19
Show Gist options
• Save ahwillia/65d8f87fcd4bded3676d67b55c1a3954 to your computer and use it in GitHub Desktop.
Cross-validation for matrix factorization models
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
 import numpy as np from numpy.random import randn, rand from scipy.optimize import minimize import matplotlib.pyplot as plt from nnls import nnlsm_blockpivot as nnlstsq import itertools from scipy.spatial.distance import cdist def censored_lstsq(A, B, M): """Solves least squares problem with missing data in B Note: uses a broadcasted solve for speed. Args ---- A (ndarray) : m x r matrix B (ndarray) : m x n matrix M (ndarray) : m x n binary matrix (zeros indicate missing values) Returns ------- X (ndarray) : r x n matrix that minimizes norm(M*(AX - B)) """ if A.ndim == 1: A = A[:,None] # else solve via tensor representation rhs = np.dot(A.T, M * B).T[:,:,None] # n x r x 1 tensor T = np.matmul(A.T[None,:,:], M.T[:,:,None] * A[None,:,:]) # n x r x r tensor try: # transpose to get r x n return np.squeeze(np.linalg.solve(T, rhs), axis=-1).T except: r = T.shape[1] T[:,np.arange(r),np.arange(r)] += 1e-6 return np.squeeze(np.linalg.solve(T, rhs), axis=-1).T def censored_nnlstsq(A, B, M): """Solves nonnegative least-squares problem with missing data in B Args ---- A (ndarray) : m x r matrix B (ndarray) : m x n matrix M (ndarray) : m x n binary matrix (zeros indicate missing values) Returns ------- X (ndarray) : nonnegative r x n matrix that minimizes norm(M*(AX - B)) """ if A.ndim == 1: A = A[:,None] rhs = np.dot(A.T, M * B).T[:,:,None] # n x r x 1 tensor T = np.matmul(A.T[None,:,:], M.T[:,:,None] * A[None,:,:]) # n x r x r tensor X = np.empty((B.shape[1], A.shape[1])) for n in range(B.shape[1]): X[n] = nnlstsq(T[n], rhs[n], is_input_prod=True)[0].T return X.T def cv_pca(data, rank, M=None, p_holdout=0.3, nonneg=False): """Fit PCA or NMF while holding out a fraction of the dataset. """ # choose solver for alternating minimization if nonneg: solver = censored_nnlstsq else: solver = censored_lstsq # create masking matrix if M is None: M = np.random.rand(*data.shape) > p_holdout # initialize U randomly if nonneg: U = np.random.rand(data.shape[0], rank) else: U = np.random.randn(data.shape[0], rank) # fit pca/nmf for itr in range(50): Vt = solver(U, data, M) U = solver(Vt.T, data.T, M.T).T # return result and test/train error resid = np.dot(U, Vt) - data train_err = np.mean(resid[M]**2) test_err = np.mean(resid[~M]**2) return U, Vt, train_err, test_err def cv_kmeans(data, rank, p_holdout=.3, M=None): """Fit kmeans while holding out a fraction of the dataset. """ # create masking matrix if M is None: M = np.random.rand(*data.shape) > p_holdout # initialize cluster centers Vt = np.random.randn(rank, data.shape[1]) U = np.empty((data.shape[0], rank)) rn = np.arange(U.shape[0]) # initialize missing data randomly imp = data.copy() imp[~M] = np.random.randn(*data.shape)[~M] # initialize cluster centers far apart Vt = [imp[np.random.randint(data.shape[0])]] while len(Vt) < rank: i = np.argmax(np.min(cdist(imp, Vt), axis=1)) Vt.append(imp[i]) Vt = np.array(Vt) # fit kmeans for itr in range(50): # update cluster assignments clus = np.argmin(cdist(imp, Vt), axis=1) U.fill(0.0) U[rn, clus] = 1.0 # update centroids Vt = censored_lstsq(U, imp, M) assert np.all(np.sum(np.abs(Vt), axis=1) > 0) # update estimates of missing data imp[~M] = np.dot(U, Vt)[~M] # return result and test/train error resid = np.dot(U, Vt) - data train_err = np.mean(resid[M]**2) test_err = np.mean(resid[~M]**2) return clus, U, Vt, train_err, test_err def plot_pca(): # parameters N, R = 150, 4 noise = 2 replicates = 1 ranks = np.arange(1, 8) # initialize U = np.random.randn(N, R) Vt = np.random.randn(R, N) data = np.dot(U, Vt) + noise*np.random.randn(N, N) train_err, test_err = [], [] # fit models for rnk, _ in itertools.product(ranks, range(replicates)): tr, te = cv_pca(data, rnk)[2:] train_err.append((rnk, tr)) test_err.append((rnk, te)) # make plot fig, ax = plt.subplots(1, 1, figsize=(4, 3.5)) ax.plot(*list(zip(*train_err)), 'o-b', label='Train Data') ax.plot(*list(zip(*test_err)), 'o-r', label='Test Data') ax.set_ylabel('Mean Squared Error') ax.set_xlabel('Number of PCs') ax.set_title('PCA') ax.axvline(4, color='k', dashes=[2,2]) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.legend() fig.tight_layout() fig.savefig('../../img/pca-crossval/pca_cv_curve.pdf') def plot_nmf(): # parameters N, R = 150, 4 noise = .8 replicates = 1 ranks = np.arange(1, 8) # initialize problem U = np.random.rand(N, R) Vt = np.random.rand(R, N) data = np.dot(U, Vt) + noise*np.random.rand(N, N) train_err, test_err = [], [] # fit models for rnk, _ in itertools.product(ranks, range(replicates)): tr, te = cv_pca(data, rnk, nonneg=True)[2:] train_err.append((rnk, tr)) test_err.append((rnk, te)) # make plot fig, ax = plt.subplots(1, 1, figsize=(4, 3.5)) ax.plot(*list(zip(*train_err)), 'o-b', label='Train Data') ax.plot(*list(zip(*test_err)), 'o-r', label='Test Data') ax.set_ylabel('Mean Squared Error') ax.set_xlabel('Number of factors') ax.set_title('NMF') ax.axvline(4, color='k', dashes=[2,2]) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.legend() fig.tight_layout() fig.savefig('../../img/pca-crossval/nmf_cv_curve.pdf') def plot_kmeans(): # parameters N, R = 150, 4 noise = 1.5 ranks = np.arange(1, 8) replicates = 10 # initialize problem U = np.zeros((N, R)) U[np.arange(N), np.random.randint(R, size=N)] = 1 Vt = np.random.randn(R, N) data = np.dot(U, Vt) + noise*np.random.randn(N, N) train_err, test_err, rr = [], [], [] # fit models for rnk, _ in itertools.product(ranks, range(replicates)): tr, te = cv_kmeans(data, rnk)[3:] train_err.append(tr) test_err.append(te) rr.append(rnk) rr = np.array(rr) train_err, test_err = np.array(train_err), np.array(test_err) mean_train = [np.mean(train_err[rr==rnk]) for rnk in ranks] mean_test = [np.mean(test_err[rr==rnk]) for rnk in ranks] # make plot fig, ax = plt.subplots(1, 1, figsize=(4, 3.5)) ax.plot(ranks, mean_train, '-b', label='Train Data') ax.plot(ranks, mean_test, '-r', label='Test Data') ax.set_ylabel('Mean Squared Error') ax.set_xlabel('Number of clusters') ax.set_title('K-means clustering') ax.axvline(4, color='k', dashes=[2,2]) ax.plot(rr, train_err, 'ob', alpha=.5, ms=3, mec=None) ax.plot(rr, test_err, 'or', alpha=.5, ms=3, mec=None) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.legend() fig.tight_layout() fig.savefig('../../img/pca-crossval/kmeans_cv_curve.pdf') if __name__ == '__main__': plot_pca() plot_nmf() plot_kmeans() plt.show()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters

### VeroMieites commented Sep 28, 2022

Could you please leave a link or a package name for the line:
`from nnls import nnlsm_blockpivot as nnlstsq`
I couldn't find it anywhere...
Ty!

### ahwillia commented Sep 28, 2022

There are two files in this gist, I think it should be contained in the second one

### ahwillia commented Sep 28, 2022 • edited

Also, it could be a bit slower, but `scipy.optimize.nnls` could probably work as well