Non-negative matrix factorization for I divergence
""" Non-negative matrix factorization for I divergence
This code was implements Lee and Seung's multiplicative updates algorithm
for NMF with I divergence cost.
Lee D. D., Seung H. S., Learning the parts of objects by non-negative
matrix factorization. Nature, 1999
# Author: Olivier Mangin <>
import warnings
import numpy as np
import scipy.sparse as sp
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils import atleast2d_or_csr
from sklearn.utils.extmath import safe_sparse_dot
def check_non_negative(X, whom):
X = if sp.issparse(X) else X
if (X < 0).any():
raise ValueError("Negative values in data passed to %s" % whom)
def _normalize_sum(a, axis=0, eps=1.e-16):
if axis >= len(a.shape):
raise ValueError
return a / (eps + np.expand_dims(np.sum(a, axis=axis), axis))
def _scale(matrix, factors, axis=0):
"""Scales line or columns of a matrix.
:param matrix: 2-dimensional array
:param factors: 1-dimensional array
:param axis: 0: columns are scaled, 1: lines are scaled
if not (len(matrix.shape) == 2):
raise ValueError(
"Wrong array shape: %s, should have only 2 dimensions."
% str(matrix.shape))
if not axis in (0, 1):
raise ValueError('Wrong axis, should be 0 (scaling lines)\
or 1 (scaling columns).')
# Transform factors given as columne shaped matrices
factors = np.squeeze(np.asarray(factors))
if axis == 1:
factors = factors[:, np.newaxis]
return np.multiply(matrix, factors)
def generalized_KL(x, y, eps=1.e-8, axis=None):
return (np.multiply(x, np.log(np.divide(x + eps, y + eps))) - x + y
def _special_sparse_dot(a, b, refmat):
"""Computes dot product of a and b on indices where refmat is nonnzero
and returns sparse csr matrix with same structure than refmat.
First calls to eliminate_zeros on refmat which might modify the structure
of refmat.
a, b: dense arrays
refmat: sparse matrix
Dot product of a and b must have refmat's shape.
ii, jj = refmat.nonzero()
dot_vals = np.multiply(a[ii, :], b.T[jj, :]).sum(axis=1)
c = sp.coo_matrix((dot_vals, (ii, jj)), shape=refmat.shape)
return c.tocsr()
class KLdivNMF(BaseEstimator, TransformerMixin):
"""Non negative factorization with Kullback Leibler divergence cost.
n_components: int or None
Number of components, if n_components is not set all components
are kept
init: 'nndsvd' | 'nndsvda' | 'nndsvdar' | 'random'
Method used to initialize the procedure.
Default: 'nndsvdar' if n_components < n_features, otherwise random.
Valid options::
'nndsvd': Nonnegative Double Singular Value Decomposition (NNDSVD)
initialization (better for sparseness)
'nndsvda': NNDSVD with zeros filled with the average of X
(better when sparsity is not desired)
'nndsvdar': NNDSVD with zeros filled with small random values
(generally faster, less accurate alternative to NNDSVDa
for when sparsity is not desired)
'random': non-negative random matrices
tol: double, default: 1e-4
Tolerance value used in stopping conditions.
max_iter: int, default: 200
Number of iterations to compute.
subit: int, default: 10
Number of sub-iterations to perform on W (resp. H) before switching
to H (resp. W) update.
`components_` : array, [n_components, n_features]
Non-negative components of the data
random_state : int or RandomState
Random number generator seed control.
>>> import numpy as np
>>> X = np.array([[1,1], [2, 1], [3, 1.2], [4, 1], [5, 0.8], [6, 1]])
>>> from kl_nmf import KLdivNMF
>>> model = KLdivNMF(n_components=2, init='random', random_state=0)
KLdivNMF(eps=1e-08, init='random', max_iter=200, n_components=2,
random_state=0, subit=10, tol=1e-06)
>>> model.components_
array([[ 0.50303234, 0.49696766],
[ 0.93326505, 0.06673495]])
This implements
Lee D. D., Seung H. S., Learning the parts of objects by non-negative
matrix factorization. Nature, 1999
def __init__(self, n_components=None, tol=1e-6, max_iter=200, eps=1.e-8,
subit=10, random_state=None):
self.n_components = n_components
self._init_dictionary = None
self.random_state = random_state
self.tol = tol
self.max_iter = max_iter
self.eps = eps
# Only for gradient updates
self.subit = subit
def _init(self, X):
n_samples, n_features = X.shape
if self._init_dictionary is None:
H_init = _normalize_sum(np.abs(np.random.random(
(self.n_components, n_features))) + .01, axis=1)
== (self.n_components, n_features))
H_init = self._init_dictionary
W_init =
return W_init, H_init
def fit_transform(self, X, y=None, weights=1., _fit=True,
return_errors=False, scale_W=False):
"""Learn a NMF model for the data X and returns the transformed data.
This is more efficient than calling fit followed by transform.
X: {array-like, sparse matrix}, shape = [n_samples, n_features]
Data matrix to be decomposed
weights: {array-like, sparse matrix}, shape = [n_samples, n_features]
Weights on the cost function used as coefficients on each
element of the data. If smaller dimension is provided, standard
numpy broadcasting is used.
return_errors: boolean
if True, the list of reconstruction errors along iterations is
scale_W: boolean (default: False)
Whether to force scaling of W during updates. This is only relevant
if components are normalized.
_fit: if True (default), update the model, else only compute transform
data: array, [n_samples, n_components]
Transformed data
or (data, errors) if return_errors
X = atleast2d_or_csr(X)
check_non_negative(X, "")
n_samples, n_features = X.shape
if not self.n_components:
self.n_components = n_features
W, H = self._init(X)
if _fit:
self.components_ = H
prev_error = np.Inf
tol = self.tol * n_samples * n_features
if return_errors:
errors = []
for n_iter in xrange(1, self.max_iter + 1):
# Stopping condition
error = self.error(X, W, self.components_, weights=weights)
if prev_error - error < tol:
prev_error = error
if return_errors:
W = self._update(X, W, _fit=_fit))
if n_iter == self.max_iter and tol > 0:
warnings.warn("Iteration limit reached during fit\n")
if return_errors:
return W, errors
return W
def _update(self, X, W, _fit=True, scale_W=False, eps=1.e-8):
"""Perform one update iteration.
Updates components if _fit and returns updated coefficients.
_fit: boolean (default: True)
Whether to update components.
scale_W: boolean (default: False)
Whether to force scaling of W. This is only relevant if
components are normalized.
if scale_W:
# This is only relevant if components are normalized.
# Not always usefull but might improve convergence speed:
# Scale W lines to have same sum than X lines
W = _scale(_normalize_sum(W, axis=1), X.sum(axis=1), axis=1)
Q = self._Q(X, W, self.components_, eps=eps)
# update W
W = self._updated_W(X, W, self.components_, Q=Q)
if _fit:
# update H
self.components_ = self._updated_H(X, W, self.components_, Q=Q)
return W
def fit(self, X, y=None, **params):
"""Learn a NMF model for the data X.
X: {array-like, sparse matrix}, shape = [n_samples, n_features]
Data matrix to be decomposed
self.fit_transform(X, **params)
return self
def transform(self, X, **params):
"""Transform the data X according to the fitted NMF model
X: {array-like, sparse matrix}, shape = [n_samples, n_features]
Data matrix to be transformed by the model
data: array, [n_samples, n_components]
Transformed data
self._init_dictionary = self.components_
params['_fit'] = False
return self.fit_transform(X, **params)
# Helpers for beta divergence and related updates
# Errors and performance estimations
def error(self, X, W, H=None, weights=1., eps=1.e-8):
X = atleast2d_or_csr(X)
if H is None:
H = self.components_
if sp.issparse(X):
WH = _special_sparse_dot(W, H, X)
# Avoid computing all values of WH to get their sum
WH_sum = np.sum(np.multiply(np.sum(W, axis=0), np.sum(H, axis=1)))
return (np.multiply(,
np.log(np.divide( + eps, + eps))
)).sum() - + WH_sum
return generalized_KL(X,, H))
# Projections
def scale(self, W, H, factors):
"""Scale W columns and H rows inversely, according to the given
safe_factors = factors + self.eps
s_W = _scale(W, safe_factors, axis=0)
s_H = _scale(H, 1. / safe_factors, axis=1)
return s_W, s_H
# Update rules
def _Q(cls, X, W, H, eps=1.e-8):
"""Computes X / (WH)
where '/' is element-wise and WH is a matrix product.
# X should be at least 2D or csr
if sp.issparse(X):
WH = _special_sparse_dot(W, H, X) = ( + eps) / ( + eps)
return WH
return np.divide(X + eps,, H) + eps)
def _updated_W(cls, X, W, H, weights=1., Q=None, eps=1.e-8):
if Q is None:
Q = cls._Q(X, W, H, eps=eps)
W = np.multiply(W, safe_sparse_dot(Q, H.T))
return W
def _updated_H(cls, X, W, H, weights=1., Q=None, eps=1.e-8):
if Q is None:
Q = cls._Q(X, W, H, eps=eps)
H = np.multiply(H, safe_sparse_dot(W.T, Q))
H = _normalize_sum(H, axis=1)
return H
import unittest
import numpy as np
from numpy.testing import assert_array_almost_equal
import scipy.sparse as sp
from nmf_kl import _scale, _special_sparse_dot, generalized_KL, KLdivNMF
def random_NN_matrix(shape):
return np.abs(np.random.random(shape))
def random_NN_sparse(h, w, density):
r = sp.rand(h, w, density) = np.abs(
return r
def is_NN(a):
return np.all(a >= 0)
class TestScale(unittest.TestCase):
def test_shape(self):
mtx = np.zeros((3, 4))
fact = np.zeros((3,))
self.assertEqual(mtx.shape, _scale(mtx, fact, axis=1).shape)
def test_error_on_wrong_axis(self):
mtx = np.zeros((3, 4))
fact = np.zeros((4,))
with self.assertRaises(ValueError):
_scale(mtx, fact, axis=3)
def test_error_on_3D_array(self):
mtx = np.zeros((3, 4, 6))
fact = np.zeros((3,))
with self.assertRaises(ValueError):
_scale(mtx, fact, axis=1)
def test_error_on_1D_array(self):
mtx = np.zeros((3,))
fact = np.zeros((3,))
with self.assertRaises(ValueError):
_scale(mtx, fact, axis=1)
def test_error_on_wrong_factor_shape(self):
mtx = np.zeros((3, 4))
fact = np.zeros((2,))
with self.assertRaises(ValueError):
_scale(mtx, fact, axis=1)
def test_scale_lines(self):
mtx = np.array([[1, 2, 3], [4, 5, 6]])
fact = np.array([2, 3])
scaled = _scale(mtx, fact, axis=1)
ok = np.array([[2, 4, 6], [12, 15, 18]])
assert_array_almost_equal(ok, scaled)
def test_scale_columns(self):
mtx = np.array([[1, 2, 3], [4, 5, 6]])
fact = np.array([3, 2, 1])
scaled = _scale(mtx, fact, axis=0)
ok = np.array([[3, 4, 3], [12, 10, 6]])
assert_array_almost_equal(ok, scaled)
class TestError(unittest.TestCase):
n_samples = 20
n_components = 3
n_features = 30
def setUp(self):
self.X = random_NN_sparse(self.n_samples, self.n_features, .1)
self.W = random_NN_matrix((self.n_samples, self.n_components))
self.H = random_NN_matrix((self.n_components, self.n_features))
self.nmf = KLdivNMF(n_components=3, tol=1e-4,
max_iter=200, eps=1.e-8, subit=10)
self.nmf.components_ = random_NN_matrix((self.n_components,
def test_error_is_gen_kl(self):
Xdense = self.X.todense()
err = self.nmf.error(Xdense, self.W, H=self.H)
kl = generalized_KL(Xdense,
assert_array_almost_equal(err, kl)
def test_error_sparse(self):
err_dense = self.nmf.error(self.X.todense(), self.W, H=self.H)
err_sparse = self.nmf.error(self.X, self.W, H=self.H)
assert_array_almost_equal(err_dense, err_sparse)
def test_error_is_gen_kl_with_compenents(self):
Xdense = self.X.todense()
err = self.nmf.error(Xdense, self.W)
kl = generalized_KL(Xdense,
assert_array_almost_equal(err, kl)
class TestUpdates(unittest.TestCase):
n_samples = 20
n_components = 3
n_features = 30
def setUp(self):
self.X = random_NN_matrix((self.n_samples, self.n_features))
self.W = random_NN_matrix((self.n_samples, self.n_components))
self.H = random_NN_matrix((self.n_components, self.n_features))
self.nmf = KLdivNMF(n_components=3, tol=1e-4,
max_iter=200, eps=1.e-8, subit=10)
self.nmf.components_ = self.H
def test_W_remains_NN(self):
W = self.nmf._updated_W(self.X, self.W, self.H)
def test_H_remains_NN(self):
H = self.nmf._updated_H(self.X, self.W, self.H)
def test_decreases_KL(self):
dkl_prev = self.nmf.error(self.X, self.W)
W = self.nmf._update(self.X, self.W, _fit=True)
dkl_next = self.nmf.error(self.X, W)
self.assertTrue(dkl_prev > dkl_next)
def test_no_compenents_update(self):
self.nmf._update(self.X, self.W, _fit=False)
self.assertTrue((self.nmf.components_ == self.H).all())
class TestSparseUpdates(TestUpdates):
"""Checks that updates are OK with sparse input.
def setUp(self):
self.X = random_NN_sparse(self.n_samples, self.n_features, .5).tocsr()
self.W = random_NN_matrix((self.n_samples, self.n_components))
self.H = random_NN_matrix((self.n_components, self.n_features))
self.nmf = KLdivNMF(n_components=3, tol=1e-4,
max_iter=200, eps=1.e-8, subit=10)
self.nmf.components_ = self.H
class TestFitTransform(unittest.TestCase):
def setUp(self):
self.nmf = KLdivNMF(n_components=3, tol=1e-6,
max_iter=200, eps=1.e-8, subit=10)
def test_cv(self):
X = random_NN_matrix((10, 5))
W, errors = self.nmf.fit_transform(X, return_errors=True)
# Last errors should be very close
self.assertTrue(abs(errors[-1] - errors[-2]) < errors[0] * 1.e-2)
def test_zero_error_on_fact_data(self):
X =, 2)), random_NN_matrix((2, 3)))
W, errors = self.nmf.fit_transform(X, return_errors=True)
self.assertTrue(errors[-1] < errors[0] * 1.e-3)
def test_no_compenents_update(self):
components = random_NN_matrix((3, 5))
self.nmf.components_ = components
self.nmf.fit_transform(random_NN_matrix((10, 5)), components,
self.assertTrue((self.nmf.components_ == components).all())
class TestSparseDot(unittest.TestCase):
def setUp(self):
self.ref = sp.rand(5, 6, .3).tocsr()
self.a = np.random.random((5, 7))
self.b = np.random.random((7, 6))
def test_indices(self):
"""Test that returned sparse matrix has same structure than refmat.
ab = _special_sparse_dot(self.a, self.b, self.ref)
self.assertTrue((ab.indptr == self.ref.indptr).all()
and (ab.indices == self.ref.indices).all())
def test_correct(self):
ok = np.multiply(, self.b), (self.ref.todense() != 0))
ans = _special_sparse_dot(self.a, self.b, self.ref).todense()
assert_array_almost_equal(ans, ok)
