Skip to content

Instantly share code, notes, and snippets.

@rocketheat
Forked from jcreinhold/rpca_gpu.py
Created May 2, 2022 00:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rocketheat/cf790e48a29f72cdc9b9d7fa96d368c0 to your computer and use it in GitHub Desktop.
Save rocketheat/cf790e48a29f72cdc9b9d7fa96d368c0 to your computer and use it in GitHub Desktop.
GPU RPCA and SVT
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
rpca_gpu
implementations of RPCA on the GPU (leveraging pytorch)
for low-rank and sparse matrix decomposition as well as
a nuclear-norm minimization routine via singular value
thresholding for matrix completion
The RPCA implementation is heavily based on:
https://github.com/dganguli/robust-pca
The svt implementation was based on:
https://github.com/tonyduan/matrix-completion
References:
[1] Candès, E. J., Li, X., Ma, Y., & Wright, J. (2011).
Robust principal component analysis?. Journal of the ACM (JACM),
58(3), 11.
[2] Cai, J. F., Candès, E. J., & Shen, Z. (2010).
A singular value thresholding algorithm for matrix completion.
SIAM Journal on Optimization, 20(4), 1956-1982.
Author: Jacob Reinhold (jacob.reinhold@jhu.edu)
"""
__all__ = ['RPCA_gpu', 'svt_gpu']
import numpy as np
import torch
import torch.nn.functional as F
class RPCA_gpu:
""" low-rank and sparse matrix decomposition via RPCA [1] with CUDA capabilities """
def __init__(self, D, mu=None, lmbda=None):
self.D = D
self.S = torch.zeros_like(self.D)
self.Y = torch.zeros_like(self.D)
self.mu = mu or (np.prod(self.D.shape) / (4 * self.norm_p(self.D, 2))).item()
self.mu_inv = 1 / self.mu
self.lmbda = lmbda or 1 / np.sqrt(np.max(self.D.shape))
@staticmethod
def norm_p(M, p):
return torch.sum(torch.pow(M, p))
@staticmethod
def shrink(M, tau):
return torch.sign(M) * F.relu(torch.abs(M) - tau) # hack to save memory
def svd_threshold(self, M, tau):
U, s, V = torch.svd(M, some=True)
return torch.mm(U, torch.mm(torch.diag(self.shrink(s, tau)), V.t()))
def fit(self, tol=None, max_iter=1000, iter_print=100):
i, err = 0, np.inf
Sk, Yk, Lk = self.S, self.Y, torch.zeros_like(self.D)
_tol = tol or 1e-7 * self.norm_p(torch.abs(self.D), 2)
while err > _tol and i < max_iter:
Lk = self.svd_threshold(
self.D - Sk + self.mu_inv * Yk, self.mu_inv)
Sk = self.shrink(
self.D - Lk + (self.mu_inv * Yk), self.mu_inv * self.lmbda)
Yk = Yk + self.mu * (self.D - Lk - Sk)
err = self.norm_p(torch.abs(self.D - Lk - Sk), 2) / self.norm_p(self.D, 2)
i += 1
if (i % iter_print) == 0 or i == 1 or i > max_iter or err <= _tol:
print(f'Iteration: {i}; Error: {err:0.4e}')
self.L, self.S = Lk, Sk
return Lk, Sk
def svt_gpu(X, mask, tau=None, delta=None, eps=1e-2, max_iter=1000, iter_print=5):
""" matrix completion via singular value thresholding [2] with CUDA capabilties """
Z = torch.zeros_like(X)
tau = tau or 5 * np.sum(X.shape) / 2
delta = delta or (1.2 * np.prod(X.shape) / torch.sum(mask)).item()
for i in range(max_iter):
U, s, V = torch.svd(Z, some=True)
s = F.relu(s - tau) # hack to save memory
A = U @ torch.diag(s) @ V.t()
Z += delta * mask * (X - A)
error = (torch.norm(mask * (X - A)) / torch.norm(mask * X)).item()
if i % iter_print == 0: print(f'Iteration: {i}; Error: {error:.4e}')
if error < eps: break
return A
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment