Skip to content

Instantly share code, notes, and snippets.

@jcreinhold
Created December 11, 2018 23:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jcreinhold/f026ecfcd0e8a8b80427707aab182e0c to your computer and use it in GitHub Desktop.
Save jcreinhold/f026ecfcd0e8a8b80427707aab182e0c to your computer and use it in GitHub Desktop.
CPU RPCA and SVT
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
rpca_cpu
implementations of RPCA on the CPU for low-rank and sparse
matrix decomposition as well as a nuclear-norm
minimization routine via singular value thresholding
for matrix completion
The RPCA implementation is heavily based on:
https://github.com/dganguli/robust-pca
The svt implementation was based on:
https://github.com/tonyduan/matrix-completion
References:
[1] Candès, E. J., Li, X., Ma, Y., & Wright, J. (2011).
Robust principal component analysis?. Journal of the ACM (JACM),
58(3), 11.
[2] Cai, J. F., Candès, E. J., & Shen, Z. (2010).
A singular value thresholding algorithm for matrix completion.
SIAM Journal on Optimization, 20(4), 1956-1982.
Author: Jacob Reinhold (jacob.reinhold@jhu.edu)
"""
__all__ = ['RPCA', 'svt']
import numpy as np
import scipy
class RPCA:
""" low-rank and sparse matrix decomposition via RPCA [1] """
def __init__(self, D, mu=None, lmbda=None):
self.D = D
self.S = np.zeros_like(self.D)
self.Y = np.zeros_like(self.D)
self.mu = mu or np.prod(self.D.shape) / (4 * self.norm_p(self.D, 2))
self.mu_inv = 1 / self.mu
self.lmbda = lmbda or 1 / np.sqrt(np.max(self.D.shape))
@staticmethod
def norm_p(M, p):
return np.sum(np.power(M, p))
@staticmethod
def shrink(M, tau):
return np.sign(M) * np.maximum(np.abs(M) - tau, 0)
def svd_threshold(self, M, tau):
U, s, Vt = scipy.linalg.svd(M, full_matrices=False)
return np.dot(U, np.dot(np.diag(self.shrink(s, tau)), Vt))
def fit(self, tol=None, max_iter=1000, iter_print=100):
i, err = 0, np.inf
Sk, Yk, Lk = self.S, self.Y, np.zeros_like(self.D)
_tol = tol or 1e-7 * self.norm_p(np.abs(self.D), 2)
while err > _tol and i < max_iter:
Lk = self.svd_threshold(
self.D - Sk + self.mu_inv * Yk, self.mu_inv)
Sk = self.shrink(
self.D - Lk + (self.mu_inv * Yk), self.mu_inv * self.lmbda)
Yk = Yk + self.mu * (self.D - Lk - Sk)
err = self.norm_p(np.abs(self.D - Lk - Sk), 2) / self.norm_p(self.D, 2)
i += 1
if (i % iter_print) == 0 or i == 1 or i > max_iter or err <= _tol:
print(f'Iteration: {i}; Error: {err:0.4e}')
self.L, self.S = Lk, Sk
return Lk, Sk
def svt(X, mask, tau=None, delta=None, eps=1e-2, max_iter=1000, iter_print=5):
""" matrix completion via singular value thresholding [2] """
Z = np.zeros_like(X)
tau = tau or 5 * np.sum(X.shape) / 2
delta = delta or 1.2 * np.prod(X.shape) / np.sum(mask)
for i in range(max_iter):
U, s, Vt = scipy.linalg.svd(Z, full_matrices=False)
s = np.maximum(s - tau, 0)
A = U @ np.diag(s) @ Vt
Z += delta * mask * (X - A)
error = np.linalg.norm(mask * (X - A)) / np.linalg.norm(mask * X)
if i % iter_print == 0: print(f'Iteration: {i}; Error: {error:.4e}')
if error < eps: break
return A
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment