sklearn GMM suitable for prediction at the last stage of pipeline
Gaussian Mixture Models.
This implementation corresponds to frequentist (non-Bayesian) formulation
of Gaussian Mixture Models.
# Author: Ron Weiss <>
# Fabian Pedregosa <>
# Bertrand Thirion <>
import numpy as np
from ..base import BaseEstimator
from ..utils import check_random_state
from ..utils.extmath import logsumexp, pinvh
from .. import cluster
EPS = np.finfo(float).eps
def log_multivariate_normal_density(X, means, covars, covariance_type='diag'):
"""Compute the log probability under a multivariate Gaussian distribution.
X : array_like, shape (n_samples, n_features)
List of n_features-dimensional data points. Each row corresponds to a
single data point.
means : array_like, shape (n_components, n_features)
List of n_features-dimensional mean vectors for n_components Gaussians.
Each row corresponds to a single mean vector.
covars : array_like
List of n_components covariance parameters for each Gaussian. The shape
depends on `covariance_type`:
(n_components, n_features) if 'spherical',
(n_features, n_features) if 'tied',
(n_components, n_features) if 'diag',
(n_components, n_features, n_features) if 'full'
covariance_type : string
Type of the covariance parameters. Must be one of
'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'.
lpr : array_like, shape (n_samples, n_components)
Array containing the log probabilities of each data point in
X under each of the n_components multivariate Gaussian distributions.
log_multivariate_normal_density_dict = {
'spherical': _log_multivariate_normal_density_spherical,
'tied': _log_multivariate_normal_density_tied,
'diag': _log_multivariate_normal_density_diag,
'full': _log_multivariate_normal_density_full}
return log_multivariate_normal_density_dict[covariance_type](
X, means, covars)
def sample_gaussian(mean, covar, covariance_type='diag', n_samples=1,
"""Generate random samples from a Gaussian distribution.
mean : array_like, shape (n_features,)
Mean of the distribution.
covars : array_like, optional
Covariance of the distribution. The shape depends on `covariance_type`:
scalar if 'spherical',
(n_features) if 'diag',
(n_features, n_features) if 'tied', or 'full'
covariance_type : string, optional
Type of the covariance parameters. Must be one of
'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'.
n_samples : int, optional
Number of samples to generate. Defaults to 1.
X : array, shape (n_features, n_samples)
Randomly generated sample
rng = check_random_state(random_state)
n_dim = len(mean)
rand = rng.randn(n_dim, n_samples)
if n_samples == 1:
rand.shape = (n_dim,)
if covariance_type == 'spherical':
rand *= np.sqrt(covar)
elif covariance_type == 'diag':
rand =, rand)
from scipy import linalg
U, s, V = linalg.svd(covar)
sqrtS = np.diag(np.sqrt(s))
sqrt_covar =,, V))
rand =, rand)
return (rand.T + mean).T
class GMM(BaseEstimator):
"""Gaussian Mixture Model
Representation of a Gaussian mixture model probability distribution.
This class allows for easy evaluation of, sampling from, and
maximum-likelihood estimation of the parameters of a GMM distribution.
Initializes parameters such that every mixture component has zero
mean and identity covariance.
n_components : int, optional
Number of mixture components. Defaults to 1.
covariance_type : string, optional
String describing the type of covariance parameters to
use. Must be one of 'spherical', 'tied', 'diag', 'full'.
Defaults to 'diag'.
random_state: RandomState or an int seed (0 by default)
A random number generator instance
min_covar : float, optional
Floor on the diagonal of the covariance matrix to prevent
overfitting. Defaults to 1e-3.
thresh : float, optional
Convergence threshold.
n_iter : int, optional
Number of EM iterations to perform.
n_init : int, optional
Number of initializations to perform. the best results is kept
params : string, optional
Controls which parameters are updated in the training
process. Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars. Defaults to 'wmc'.
init_params : string, optional
Controls which parameters are updated in the initialization
process. Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars. Defaults to 'wmc'.
`weights_` : array, shape (`n_components`,)
This attribute stores the mixing weights for each mixture component.
`means_` : array, shape (`n_components`, `n_features`)
Mean parameters for each mixture component.
`covars_` : array
Covariance parameters for each mixture component. The shape
depends on `covariance_type`::
(n_components, n_features) if 'spherical',
(n_features, n_features) if 'tied',
(n_components, n_features) if 'diag',
(n_components, n_features, n_features) if 'full'
`converged_` : bool
True when convergence was reached in fit(), False otherwise.
See Also
DPGMM : Ininite gaussian mixture model, using the dirichlet
process, fit with a variational algorithm
VBGMM : Finite gaussian mixture model fit with a variational
algorithm, better for situations where there might be too little
data to get a good estimate of the covariance matrix.
>>> import numpy as np
>>> from sklearn import mixture
>>> np.random.seed(1)
>>> g = mixture.GMM(n_components=2)
>>> # Generate random observations with two modes centered on 0
>>> # and 10 to use for training.
>>> obs = np.concatenate((np.random.randn(100, 1),
... 10 + np.random.randn(300, 1)))
GMM(covariance_type='diag', init_params='wmc', min_covar=0.001,
n_components=2, n_init=1, n_iter=100, params='wmc',
random_state=None, thresh=0.01)
>>> np.round(g.weights_, 2)
array([ 0.75, 0.25])
>>> np.round(g.means_, 2)
array([[ 10.05],
[ 0.06]])
>>> np.round(g.covars_, 2) #doctest: +SKIP
array([[[ 1.02]],
[[ 0.96]]])
>>> g.predict([[0], [2], [9], [10]]) #doctest: +ELLIPSIS
array([1, 1, 0, 0]...)
>>> np.round(g.score([[0], [2], [9], [10]]), 2)
array([-2.19, -4.58, -1.75, -1.21])
>>> # Refit the model on new data (initial parameters remain the
>>> # same), this time with an even split between the two modes.
>>> * [[0]] + 20 * [[10]]) # doctest: +NORMALIZE_WHITESPACE
GMM(covariance_type='diag', init_params='wmc', min_covar=0.001,
n_components=2, n_init=1, n_iter=100, params='wmc',
random_state=None, thresh=0.01)
>>> np.round(g.weights_, 2)
array([ 0.5, 0.5])
def __init__(self, n_components=1, covariance_type='diag',
random_state=None, thresh=1e-2, min_covar=1e-3,
n_iter=100, n_init=1, params='wmc', init_params='wmc'):
self.n_components = n_components
self.covariance_type = covariance_type
self.thresh = thresh
self.min_covar = min_covar
self.random_state = random_state
self.n_iter = n_iter
self.n_init = n_init
self.params = params
self.init_params = init_params
if not covariance_type in ['spherical', 'tied', 'diag', 'full']:
raise ValueError('Invalid value for covariance_type: %s' %
if n_init < 1:
raise ValueError('GMM estimation requires at least one run')
self.weights_ = np.ones(self.n_components) / self.n_components
# flag to indicate exit status of fit() method: converged (True) or
# n_iter reached (False)
self.converged_ = False
def _get_covars(self):
"""Covariance parameters for each mixture component.
The shape depends on `cvtype`::
(`n_states`, 'n_features') if 'spherical',
(`n_features`, `n_features`) if 'tied',
(`n_states`, `n_features`) if 'diag',
(`n_states`, `n_features`, `n_features`) if 'full'
if self.covariance_type == 'full':
return self.covars_
elif self.covariance_type == 'diag':
return [np.diag(cov) for cov in self.covars_]
elif self.covariance_type == 'tied':
return [self.covars_] * self.n_components
elif self.covariance_type == 'spherical':
return [np.diag(cov) for cov in self.covars_]
def _set_covars(self, covars):
"""Provide values for covariance"""
covars = np.asarray(covars)
_validate_covars(covars, self.covariance_type, self.n_components)
self.covars_ = covars
def eval(self, X):
"""Evaluate the model on data
Compute the log probability of X under the model and
return the posterior distribution (responsibilities) of each
mixture component for each element of X.
X: array_like, shape (n_samples, n_features)
List of n_features-dimensional data points. Each row
corresponds to a single data point.
logprob: array_like, shape (n_samples,)
Log probabilities of each data point in X
responsibilities: array_like, shape (n_samples, n_components)
Posterior probabilities of each mixture component for each
X = np.asarray(X)
if X.ndim == 1:
X = X[:, np.newaxis]
if X.size == 0:
return np.array([]), np.empty((0, self.n_components))
if X.shape[1] != self.means_.shape[1]:
raise ValueError('the shape of X is not compatible with self')
lpr = (log_multivariate_normal_density(X, self.means_, self.covars_,
+ np.log(self.weights_))
logprob = logsumexp(lpr, axis=1)
responsibilities = np.exp(lpr - logprob[:, np.newaxis])
return logprob, responsibilities
def score(self, X):
"""Compute the log probability under the model.
X : array_like, shape (n_samples, n_features)
List of n_features-dimensional data points. Each row
corresponds to a single data point.
logprob : array_like, shape (n_samples,)
Log probabilities of each data point in X
logprob, _ = self.eval(X)
return logprob
def predict(self, X):
"""Predict label for data.
X : array-like, shape = [n_samples, n_features]
C : array, shape = (n_samples,)
logprob, responsibilities = self.eval(X)
return responsibilities.argmax(axis=1)
def predict_proba(self, X):
"""Predict posterior probability of data under each Gaussian
in the model.
X : array-like, shape = [n_samples, n_features]
responsibilities : array-like, shape = (n_samples, n_components)
Returns the probability of the sample for each Gaussian
(state) in the model.
logprob, responsibilities = self.eval(X)
return responsibilities
def sample(self, n_samples=1, random_state=None):
"""Generate random samples from the model.
n_samples : int, optional
Number of samples to generate. Defaults to 1.
X : array_like, shape (n_samples, n_features)
List of samples
if random_state is None:
random_state = self.random_state
random_state = check_random_state(random_state)
weight_cdf = np.cumsum(self.weights_)
X = np.empty((n_samples, self.means_.shape[1]))
rand = random_state.rand(n_samples)
# decide which component to use for each sample
comps = weight_cdf.searchsorted(rand)
# for each component, generate all needed samples
for comp in xrange(self.n_components):
# occurrences of current component in X
comp_in_X = (comp == comps)
# number of those occurrences
num_comp_in_X = comp_in_X.sum()
if num_comp_in_X > 0:
if self.covariance_type == 'tied':
cv = self.covars_
elif self.covariance_type == 'spherical':
cv = self.covars_[comp][0]
cv = self.covars_[comp]
X[comp_in_X] = sample_gaussian(
self.means_[comp], cv, self.covariance_type,
num_comp_in_X, random_state=random_state).T
return X
def fit(self, X, y = None):
"""Estimate model parameters with the expectation-maximization
A initialization step is performed before entering the em
algorithm. If you want to avoid this step, set the keyword
argument init_params to the empty string '' when creating the
GMM object. Likewise, if you would like just to do an
initialization, set n_iter=0.
X : array_like, shape (n, n_features)
List of n_features-dimensional data points. Each row
corresponds to a single data point.
y : (optional) array, shape = [n]
if provided 'm' in init_params will be ignored
n_components and _means will be re-calculated
## initialization step
X = np.asarray(X, dtype=np.float)
if X.ndim == 1:
X = X[:, np.newaxis]
if y is not None:
n_classes = len(np.unique(y))
self.n_components = n_classes
if X.shape[0] < self.n_components:
raise ValueError(
'GMM estimation with %s components, but got only %s samples' %
(self.n_components, X.shape[0]))
max_log_prob = -np.infty
for _ in range(self.n_init):
if y is not None:
self.means_ = np.array([X[y == i, :].mean(axis=0) for i in np.unique(y)])
elif 'm' in self.init_params or not hasattr(self, 'means_'):
self.means_ = cluster.KMeans(
if 'w' in self.init_params or not hasattr(self, 'weights_'):
self.weights_ = np.tile(1.0 / self.n_components,
if 'c' in self.init_params or not hasattr(self, 'covars_'):
cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1])
if not cv.shape:
cv.shape = (1, 1)
self.covars_ = \
cv, self.covariance_type, self.n_components)
# EM algorithms
log_likelihood = []
# reset self.converged_ to False
self.converged_ = False
for i in xrange(self.n_iter):
# Expectation step
curr_log_likelihood, responsibilities = self.eval(X)
# Check for convergence.
if i > 0 and abs(log_likelihood[-1] - log_likelihood[-2]) < \
self.converged_ = True
# Maximization step
self._do_mstep(X, responsibilities, self.params,
# if the results are better, keep it
if self.n_iter:
if log_likelihood[-1] > max_log_prob:
max_log_prob = log_likelihood[-1]
best_params = {'weights': self.weights_,
'means': self.means_,
'covars': self.covars_}
# check the existence of an init param that was not subject to
# likelihood computation issue.
if np.isneginf(max_log_prob) and self.n_iter:
raise RuntimeError(
"EM algorithm was never able to compute a valid likelihood " +
"given initial parameters. Try different init parameters " +
"(or increasing n_init) or check for degenerate data.")
# self.n_iter == 0 occurs when using GMM within HMM
if self.n_iter:
self.covars_ = best_params['covars']
self.means_ = best_params['means']
self.weights_ = best_params['weights']
return self
def _do_mstep(self, X, responsibilities, params, min_covar=0):
""" Perform the Mstep of the EM algorithm and return the class weihgts.
weights = responsibilities.sum(axis=0)
weighted_X_sum =, X)
inverse_weights = 1.0 / (weights[:, np.newaxis] + 10 * EPS)
if 'w' in params:
self.weights_ = (weights / (weights.sum() + 10 * EPS) + EPS)
if 'm' in params:
self.means_ = weighted_X_sum * inverse_weights
if 'c' in params:
covar_mstep_func = _covar_mstep_funcs[self.covariance_type]
self.covars_ = covar_mstep_func(
self, X, responsibilities, weighted_X_sum, inverse_weights,
return weights
def _n_parameters(self):
"""Return the number of free parameters in the model."""
ndim = self.means_.shape[1]
if self.covariance_type == 'full':
cov_params = self.n_components * ndim * (ndim + 1) / 2.
elif self.covariance_type == 'diag':
cov_params = self.n_components * ndim
elif self.covariance_type == 'tied':
cov_params = ndim * (ndim + 1) / 2.
elif self.covariance_type == 'spherical':
cov_params = self.n_components
mean_params = ndim * self.n_components
return int(cov_params + mean_params + self.n_components - 1)
def bic(self, X):
"""Bayesian information criterion for the current model fit
and the proposed data
X : array of shape(n_samples, n_dimensions)
bic: float (the lower the better)
return (-2 * self.score(X).sum() +
self._n_parameters() * np.log(X.shape[0]))
def aic(self, X):
"""Akaike information criterion for the current model fit
and the proposed data
X : array of shape(n_samples, n_dimensions)
aic: float (the lower the better)
return - 2 * self.score(X).sum() + 2 * self._n_parameters()
## some helper routines
def _log_multivariate_normal_density_diag(X, means=0.0, covars=1.0):
"""Compute Gaussian log-density at X for a diagonal model"""
n_samples, n_dim = X.shape
lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.sum(np.log(covars), 1)
+ np.sum((means ** 2) / covars, 1)
- 2 *, (means / covars).T)
+ ** 2, (1.0 / covars).T))
return lpr
def _log_multivariate_normal_density_spherical(X, means=0.0, covars=1.0):
"""Compute Gaussian log-density at X for a spherical model"""
cv = covars.copy()
if covars.ndim == 1:
cv = cv[:, np.newaxis]
if covars.shape[1] == 1:
cv = np.tile(cv, (1, X.shape[-1]))
return _log_multivariate_normal_density_diag(X, means, cv)
def _log_multivariate_normal_density_tied(X, means, covars):
"""Compute Gaussian log-density at X for a tied model"""
from scipy import linalg
n_samples, n_dim = X.shape
icv = pinvh(covars)
lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.log(linalg.det(covars) + 0.1)
+ np.sum(X *, icv), 1)[:, np.newaxis]
- 2 *, icv), means.T)
+ np.sum(means *, icv), 1))
return lpr
def _log_multivariate_normal_density_full(X, means, covars, min_covar=1.e-7):
"""Log probability for full covariance matrices.
from scipy import linalg
import itertools
if hasattr(linalg, 'solve_triangular'):
# only in scipy since 0.9
solve_triangular = linalg.solve_triangular
# slower, but works
solve_triangular = linalg.solve
n_samples, n_dim = X.shape
nmix = len(means)
log_prob = np.empty((n_samples, nmix))
for c, (mu, cv) in enumerate(itertools.izip(means, covars)):
cv_chol = linalg.cholesky(cv, lower=True)
except linalg.LinAlgError:
# The model is most probabily stuck in a component with too
# few observations, we need to reinitialize this components
cv_chol = linalg.cholesky(cv + min_covar * np.eye(n_dim),
cv_log_det = 2 * np.sum(np.log(np.diagonal(cv_chol)))
cv_sol = solve_triangular(cv_chol, (X - mu).T, lower=True).T
log_prob[:, c] = - .5 * (np.sum(cv_sol ** 2, axis=1) +
n_dim * np.log(2 * np.pi) + cv_log_det)
return log_prob
def _validate_covars(covars, covariance_type, n_components):
"""Do basic checks on matrix covariance sizes and values
from scipy import linalg
if covariance_type == 'spherical':
if len(covars) != n_components:
raise ValueError("'spherical' covars have length n_components")
elif np.any(covars <= 0):
raise ValueError("'spherical' covars must be non-negative")
elif covariance_type == 'tied':
if covars.shape[0] != covars.shape[1]:
raise ValueError("'tied' covars must have shape (n_dim, n_dim)")
elif (not np.allclose(covars, covars.T)
or np.any(linalg.eigvalsh(covars) <= 0)):
raise ValueError("'tied' covars must be symmetric, "
elif covariance_type == 'diag':
if len(covars.shape) != 2:
raise ValueError("'diag' covars must have shape"
"(n_components, n_dim)")
elif np.any(covars <= 0):
raise ValueError("'diag' covars must be non-negative")
elif covariance_type == 'full':
if len(covars.shape) != 3:
raise ValueError("'full' covars must have shape "
"(n_components, n_dim, n_dim)")
elif covars.shape[1] != covars.shape[2]:
raise ValueError("'full' covars must have shape "
"(n_components, n_dim, n_dim)")
for n, cv in enumerate(covars):
if (not np.allclose(cv, cv.T)
or np.any(linalg.eigvalsh(cv) <= 0)):
raise ValueError("component %d of 'full' covars must be "
"symmetric, positive-definite" % n)
raise ValueError("covariance_type must be one of " +
"'spherical', 'tied', 'diag', 'full'")
def distribute_covar_matrix_to_match_covariance_type(
tied_cv, covariance_type, n_components):
"""Create all the covariance matrices from a given template
if covariance_type == 'spherical':
cv = np.tile(tied_cv.mean() * np.ones(tied_cv.shape[1]),
(n_components, 1))
elif covariance_type == 'tied':
cv = tied_cv
elif covariance_type == 'diag':
cv = np.tile(np.diag(tied_cv), (n_components, 1))
elif covariance_type == 'full':
cv = np.tile(tied_cv, (n_components, 1, 1))
raise ValueError("covariance_type must be one of " +
"'spherical', 'tied', 'diag', 'full'")
return cv
def _covar_mstep_diag(gmm, X, responsibilities, weighted_X_sum, norm,
"""Performing the covariance M step for diagonal cases"""
avg_X2 =, X * X) * norm
avg_means2 = gmm.means_ ** 2
avg_X_means = gmm.means_ * weighted_X_sum * norm
return avg_X2 - 2 * avg_X_means + avg_means2 + min_covar
def _covar_mstep_spherical(*args):
"""Performing the covariance M step for spherical cases"""
cv = _covar_mstep_diag(*args)
return np.tile(cv.mean(axis=1)[:, np.newaxis], (1, cv.shape[1]))
def _covar_mstep_full(gmm, X, responsibilities, weighted_X_sum, norm,
"""Performing the covariance M step for full cases"""
# Eq. 12 from K. Murphy, "Fitting a Conditional Linear Gaussian
# Distribution"
n_features = X.shape[1]
cv = np.empty((gmm.n_components, n_features, n_features))
for c in xrange(gmm.n_components):
post = responsibilities[:, c]
# Underflow Errors in doing post * X.T are not important
avg_cv = * X.T, X) / (post.sum() + 10 * EPS)
mu = gmm.means_[c][np.newaxis]
cv[c] = (avg_cv -, mu) + min_covar * np.eye(n_features))
return cv
def _covar_mstep_tied(gmm, X, responsibilities, weighted_X_sum, norm,
# Eq. 15 from K. Murphy, "Fitting a Conditional Linear Gaussian
n_features = X.shape[1]
avg_X2 =, X)
avg_means2 =, weighted_X_sum)
return (avg_X2 - avg_means2 + min_covar * np.eye(n_features)) / X.shape[0]
_covar_mstep_funcs = {'spherical': _covar_mstep_spherical,
'diag': _covar_mstep_diag,
'tied': _covar_mstep_tied,
'full': _covar_mstep_full,
