Skip to content

Instantly share code, notes, and snippets.

@tmylk
Last active May 2, 2016 22:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tmylk/4da77c179adf7075ab09 to your computer and use it in GitHub Desktop.
Save tmylk/4da77c179adf7075ab09 to your computer and use it in GitHub Desktop.
heavily logged versions of LDA in sklearn and gensim to enable comparison
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011 Radim Rehurek <radimrehurek@seznam.cz>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
#
# Parts of the LDA inference code come from Dr. Hoffman's `onlineldavb.py` script,
# (C) 2010 Matthew D. Hoffman, GNU GPL 3.0
"""
**For a faster implementation of LDA (parallelized for multicore machines), see** :mod:`gensim.models.ldamulticore`.
Latent Dirichlet Allocation (LDA) in Python.
This module allows both LDA model estimation from a training corpus and inference of topic
distribution on new, unseen documents. The model can also be updated with new documents
for online training.
The core estimation code is based on the `onlineldavb.py` script by M. Hoffman [1]_, see
**Hoffman, Blei, Bach: Online Learning for Latent Dirichlet Allocation, NIPS 2010.**
The algorithm:
* is **streamed**: training documents may come in sequentially, no random access required,
* runs in **constant memory** w.r.t. the number of documents: size of the
training corpus does not affect memory footprint, can process corpora larger than RAM, and
* is **distributed**: makes use of a cluster of machines, if available, to
speed up model estimation.
.. [1] http://www.cs.princeton.edu/~mdhoffma
"""
import logging
import numpy # for arrays, array broadcasting etc.
from gensim import interfaces, utils, matutils
from itertools import chain
from scipy.special import gammaln, psi # gamma function utils
from scipy.special import polygamma
from six.moves import xrange
import six
# log(sum(exp(x))) that tries to avoid overflow
try:
# try importing from here if older scipy is installed
from scipy.maxentropy import logsumexp
except ImportError:
# maxentropy has been removed in recent releases, logsumexp now in misc
from scipy.misc import logsumexp
logger = logging.getLogger('gensim.models.ldamodel')
EPS = numpy.finfo(numpy.float).eps
def dirichlet_expectation(alpha):
"""
For a vector `theta~Dir(alpha)`, compute `E[log(theta)]`.
"""
if (len(alpha.shape) == 1):
result = psi(alpha) - psi(numpy.sum(alpha))
else:
result = psi(alpha) - psi(numpy.sum(alpha, 1))[:, numpy.newaxis]
return result.astype(alpha.dtype) # keep the same precision as input
class LdaState(utils.SaveLoad):
"""
Encapsulate information for distributed computation of LdaModel objects.
Objects of this class are sent over the network, so try to keep them lean to
reduce traffic.
"""
def __init__(self, eta, shape):
self.eta = eta
self.sstats = numpy.zeros(shape)
self.numdocs = 0
def reset(self):
"""
Prepare the state for a new EM iteration (reset sufficient stats).
"""
self.sstats[:] = 0.0
self.numdocs = 0
def merge(self, other):
"""
Merge the result of an E step from one node with that of another node
(summing up sufficient statistics).
The merging is trivial and after merging all cluster nodes, we have the
exact same result as if the computation was run on a single node (no
approximation).
"""
assert other is not None
self.sstats += other.sstats
self.numdocs += other.numdocs
def blend(self, rhot, other, targetsize=None):
"""
Given LdaState `other`, merge it with the current state. Stretch both to
`targetsize` documents before merging, so that they are of comparable
magnitude.
Merging is done by average weighting: in the extremes, `rhot=0.0` means
`other` is completely ignored; `rhot=1.0` means `self` is completely ignored.
This procedure corresponds to the stochastic gradient update from Hoffman
et al., algorithm 2 (eq. 14).
"""
assert other is not None
if targetsize is None:
targetsize = self.numdocs
# stretch the current model's expected n*phi counts to target size
if self.numdocs == 0 or targetsize == self.numdocs:
scale = 1.0
else:
scale = 1.0 * targetsize / self.numdocs
self.sstats *= (1.0 - rhot) * scale
# stretch the incoming n*phi counts to target size
if other.numdocs == 0 or targetsize == other.numdocs:
scale = 1.0
else:
logger.info("merging changes from %i documents into a model of %i documents",
other.numdocs, targetsize)
scale = 1.0 * targetsize / other.numdocs
self.sstats += rhot * scale * other.sstats
self.numdocs = targetsize
def blend2(self, rhot, other, targetsize=None):
"""
Alternative, more simple blend.
"""
assert other is not None
if targetsize is None:
targetsize = self.numdocs
# merge the two matrices by summing
self.sstats += other.sstats
self.numdocs = targetsize
def get_lambda(self):
return self.eta + self.sstats
def get_Elogbeta(self):
return dirichlet_expectation(self.get_lambda())
# endclass LdaState
class LdaModel(interfaces.TransformationABC):
"""
The constructor estimates Latent Dirichlet Allocation model parameters based
on a training corpus:
>>> lda = LdaModel(corpus, num_topics=10)
You can then infer topic distributions on new, unseen documents, with
>>> doc_lda = lda[doc_bow]
The model can be updated (trained) with new documents via
>>> lda.update(other_corpus)
Model persistency is achieved through its `load`/`save` methods.
"""
def __init__(self, corpus=None, num_topics=100, id2word=None,
distributed=False, chunksize=2000, passes=1, update_every=1,
alpha='symmetric', eta=None, decay=0.5, offset=1.0,
eval_every=10, iterations=50, gamma_threshold=0.001,
minimum_probability=0.01):
"""
If given, start training from the iterable `corpus` straight away. If not given,
the model is left untrained (presumably because you want to call `update()` manually).
`num_topics` is the number of requested latent topics to be extracted from
the training corpus.
`id2word` is a mapping from word ids (integers) to words (strings). It is
used to determine the vocabulary size, as well as for debugging and topic
printing.
`alpha` and `eta` are hyperparameters that affect sparsity of the document-topic
(theta) and topic-word (lambda) distributions. Both default to a symmetric
1.0/num_topics prior.
`alpha` can be set to an explicit array = prior of your choice. It also
support special values of 'asymmetric' and 'auto': the former uses a fixed
normalized asymmetric 1.0/topicno prior, the latter learns an asymmetric
prior directly from your data.
`eta` can be a scalar for a symmetric prior over topic/word
distributions, or a matrix of shape num_topics x num_words,
which can be used to impose asymmetric priors over the word
distribution on a per-topic basis. This may be useful if you
want to seed certain topics with particular words by boosting
the priors for those words.
Turn on `distributed` to force distributed computing (see the `web tutorial <http://radimrehurek.com/gensim/distributed.html>`_
on how to set up a cluster of machines for gensim).
Calculate and log perplexity estimate from the latest mini-batch every
`eval_every` model updates (setting this to 1 slows down training ~2x;
default is 10 for better performance). Set to None to disable perplexity estimation.
`decay` and `offset` parameters are the same as Kappa and Tau_0 in
Hoffman et al, respectively.
`minimum_probability` controls filtering the topics returned for a document (bow).
Example:
>>> lda = LdaModel(corpus, num_topics=100) # train model
>>> print(lda[doc_bow]) # get topic probability distribution for a document
>>> lda.update(corpus2) # update the LDA model with additional documents
>>> print(lda[doc_bow])
>>> lda = LdaModel(corpus, num_topics=50, alpha='auto', eval_every=5) # train asymmetric alpha from data
"""
# store user-supplied parameters
self.id2word = id2word
if corpus is None and self.id2word is None:
raise ValueError('at least one of corpus/id2word must be specified, to establish input space dimensionality')
if self.id2word is None:
logger.warning("no word id mapping provided; initializing from corpus, assuming identity")
self.id2word = utils.dict_from_corpus(corpus)
self.num_terms = len(self.id2word)
elif len(self.id2word) > 0:
self.num_terms = 1 + max(self.id2word.keys())
else:
self.num_terms = 0
if self.num_terms == 0:
raise ValueError("cannot compute LDA over an empty collection (no terms)")
self.distributed = bool(distributed)
self.num_topics = int(num_topics)
self.chunksize = chunksize
self.decay = decay
self.offset = offset
self.minimum_probability = minimum_probability
self.num_updates = 0
self.passes = passes
self.update_every = update_every
self.eval_every = eval_every
self.optimize_alpha = alpha == 'auto'
if alpha == 'symmetric' or alpha is None:
logger.info("using symmetric alpha at %s", 1.0 / num_topics)
self.alpha = numpy.asarray([1.0 / num_topics for i in xrange(num_topics)])
elif alpha == 'asymmetric':
self.alpha = numpy.asarray([1.0 / (i + numpy.sqrt(num_topics)) for i in xrange(num_topics)])
self.alpha /= self.alpha.sum()
logger.info("using asymmetric alpha %s", list(self.alpha))
elif alpha == 'auto':
self.alpha = numpy.asarray([1.0 / num_topics for i in xrange(num_topics)])
logger.info("using autotuned alpha, starting with %s", list(self.alpha))
else:
# must be either float or an array of floats, of size num_topics
self.alpha = alpha if isinstance(alpha, numpy.ndarray) else numpy.asarray([alpha] * num_topics)
if len(self.alpha) != num_topics:
raise RuntimeError("invalid alpha shape (must match num_topics)")
if eta is None:
self.eta = 1.0 / num_topics
else:
self.eta = eta
# VB constants
self.iterations = iterations
self.gamma_threshold = gamma_threshold
# set up distributed environment if necessary
if not distributed:
logger.info("using serial LDA version on this node")
self.dispatcher = None
self.numworkers = 1
else:
if self.optimize_alpha:
raise NotImplementedError("auto-optimizing alpha not implemented in distributed LDA")
# set up distributed version
try:
import Pyro4
dispatcher = Pyro4.Proxy('PYRONAME:gensim.lda_dispatcher')
logger.debug("looking for dispatcher at %s" % str(dispatcher._pyroUri))
dispatcher.initialize(id2word=self.id2word, num_topics=num_topics,
chunksize=chunksize, alpha=alpha, eta=eta, distributed=False)
self.dispatcher = dispatcher
self.numworkers = len(dispatcher.getworkers())
logger.info("using distributed version with %i workers" % self.numworkers)
except Exception as err:
logger.error("failed to initialize distributed LDA (%s)", err)
raise RuntimeError("failed to initialize distributed LDA (%s)" % err)
# Initialize the variational distribution q(beta|lambda)
self.state = LdaState(self.eta, (self.num_topics, self.num_terms))
self.state.sstats = numpy.random.gamma(100., 1. / 100., (self.num_topics, self.num_terms))
if __debug__:
logger.debug("Initial sstats %s", str(self.state.sstats))
logger.debug("Initial lambda = eta + sstats is %s", str(self.state.get_lambda()))
## this adds eta which we don't want
# self.sync_state()
# !!! first change by Lev
self.expElogbeta = numpy.exp(dirichlet_expectation(self.state.sstats))
if __debug__:
logger.debug("Initial expElogbeta %s", str(self.expElogbeta))
# if a training corpus was provided, start estimating the model right away
if corpus is not None:
self.update(corpus)
def __str__(self):
return "LdaModel(num_terms=%s, num_topics=%s, decay=%s, chunksize=%s)" % \
(self.num_terms, self.num_topics, self.decay, self.chunksize)
def sync_state(self):
if __debug__:
logger.debug("syncing state: old expElogbeta: %s , eta %s, .sstats %s", self.expElogbeta, self.state.eta,\
self.state.sstats)
self.expElogbeta = numpy.exp(self.state.get_Elogbeta())
if __debug__:
logger.debug("synced state: new expElogbeta: %s", self.expElogbeta)
def clear(self):
"""Clear model state (free up some memory). Used in the distributed algo."""
self.state = None
self.Elogbeta = None
def inference(self, chunk, collect_sstats=False):
"""
Given a chunk of sparse document vectors, estimate gamma (parameters
controlling the topic weights) for each document in the chunk.
This function does not modify the model (=is read-only aka const). The
whole input chunk of document is assumed to fit in RAM; chunking of a
large corpus must be done earlier in the pipeline.
If `collect_sstats` is True, also collect sufficient statistics needed
to update the model's topic-word distributions, and return a 2-tuple
`(gamma, sstats)`. Otherwise, return `(gamma, None)`. `gamma` is of shape
`len(chunk) x self.num_topics`.
Avoids computing the `phi` variational parameter directly using the
optimization presented in **Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001**.
"""
try:
_ = len(chunk)
except:
# convert iterators/generators to plain list, so we have len() etc.
chunk = list(chunk)
if len(chunk) > 1:
logger.debug("performing inference on a chunk of %i documents", len(chunk))
# Initialize the variational distribution q(theta|gamma) for the chunk
gamma = numpy.random.gamma(100., 1. / 100., (len(chunk), self.num_topics))
if __debug__:
logger.debug("(n_samples = %s, n_topics =%s))", len(chunk), self.num_topics)
logger.debug("Initialised doc_topic_distr aka q(theta|gamma) %s", str(gamma))
Elogtheta = dirichlet_expectation(gamma)
expElogtheta = numpy.exp(Elogtheta)
if collect_sstats:
sstats = numpy.zeros_like(self.expElogbeta)
else:
sstats = None
if __debug__:
logger.debug("Initialised sstats as %s", str(sstats))
converged = 0
# Now, for each document d update that document's gamma and phi
# Inference code copied from Hoffman's `onlineldavb.py` (esp. the
# Lee&Seung trick which speeds things up by an order of magnitude, compared
# to Blei's original LDA-C code, cool!).
for d, doc in enumerate(chunk):
ids = [id for id, _ in doc]
cts = numpy.array([cnt for _, cnt in doc])
gammad = gamma[d, :]
Elogthetad = Elogtheta[d, :]
expElogthetad = expElogtheta[d, :]
expElogbetad = self.expElogbeta[:, ids]
# The optimal phi_{dwk} is proportional to expElogthetad_k * expElogbetad_w.
# phinorm is the normalizer.
# TODO treat zeros explicitly, instead of adding 1e-100?
# !!! second change by level: adding explicit EPS to match sklearn
phinorm = numpy.dot(expElogthetad, expElogbetad) + EPS
if __debug__:
logger.debug("document %i, iteration initial, exp_doc_topic_d aka expElogthetad %s" , d, expElogthetad)
logger.debug("document %i, iteration initial, exp_topic_word_d aka expElogbetad %s" , d, expElogbetad)
logger.debug("document %i, norm_phi aka phinorm %s", d, phinorm)
# Iterate between gamma and phi until convergence
for _ in xrange(self.iterations):
lastgamma = gammad
# We represent phi implicitly to save memory and time.
# Substituting the value of the optimal phi back into
# the update for gamma gives this update. Cf. Lee&Seung 2001.
gammad = self.alpha + expElogthetad * numpy.dot(cts / phinorm, expElogbetad.T)
if __debug__:
logger.debug("document %i, iteration %i, new doc_topic_d aka gammad %s", d, _, str(gammad))
Elogthetad = dirichlet_expectation(gammad)
expElogthetad = numpy.exp(Elogthetad)
phinorm = numpy.dot(expElogthetad, expElogbetad) + EPS
if __debug__:
logger.debug("document %i, iteration %i, exp_doc_topic_d aka expElogthetad %s" , d, _, expElogthetad)
logger.debug("document %i, iteration %i, exp_topic_word_d aka expElogbetad %s" , d, _, expElogbetad)
logger.debug("document %i, iteration %i, norm_phi aka phinorm %s", d, _, phinorm)
# If gamma hasn't changed much, we're done.
meanchange = numpy.mean(abs(gammad - lastgamma))
if (meanchange < self.gamma_threshold):
converged += 1
break
gamma[d, :] = gammad
if collect_sstats:
# Contribution of document d to the expected sufficient
# statistics for the M step.
sstats[:, ids] += numpy.outer(expElogthetad.T, cts / phinorm)
if __debug__:
logger.debug("document %i, final, suff_stats aka sstats %s" , d, str(sstats))
logger.debug("document %i, final, norm_phi aka phinorm %s", d, phinorm)
logger.debug("document %i, final, norm_phi shape %s", d, phinorm.shape)
if len(chunk) > 1:
logger.debug("%i/%i documents converged within %i iterations",
converged, len(chunk), self.iterations)
if collect_sstats:
# This step finishes computing the sufficient statistics for the
# M step, so that
# sstats[k, w] = \sum_d n_{dw} * phi_{dwk}
# = \sum_d n_{dw} * exp{Elogtheta_{dk} + Elogbeta_{kw}} / phinorm_{dw}.
sstats *= self.expElogbeta
if __debug__:
logger.debug("all documents, (suff_stats aka sstats) multiplied by expElogbeta %s" , str(sstats))
return gamma, sstats
def do_estep(self, chunk, state=None):
"""
Perform inference on a chunk of documents, and accumulate the collected
sufficient statistics in `state` (or `self.state` if None).
"""
if state is None:
state = self.state
gamma, sstats = self.inference(chunk, collect_sstats=True)
state.sstats += sstats
state.numdocs += gamma.shape[0] # avoids calling len(chunk) on a generator
return gamma
def update_alpha(self, gammat, rho):
"""
Update parameters for the Dirichlet prior on the per-document
topic weights `alpha` given the last `gammat`.
Uses Newton's method, described in **Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters.**
http://jonathan-huang.org/research/dirichlet/dirichlet.pdf
"""
N = float(len(gammat))
logphat = sum(dirichlet_expectation(gamma) for gamma in gammat) / N
dalpha = numpy.copy(self.alpha)
gradf = N * (psi(numpy.sum(self.alpha)) - psi(self.alpha) + logphat)
c = N * polygamma(1, numpy.sum(self.alpha))
q = -N * polygamma(1, self.alpha)
b = numpy.sum(gradf / q) / (1 / c + numpy.sum(1 / q))
dalpha = -(gradf - b) / q
if all(rho * dalpha + self.alpha > 0):
self.alpha += rho * dalpha
else:
logger.warning("updated alpha not positive")
logger.info("optimized alpha %s", list(self.alpha))
return self.alpha
def log_perplexity(self, chunk, total_docs=None):
"""
Calculate and return per-word likelihood bound, using the `chunk` of
documents as evaluation corpus. Also output the calculated statistics. incl.
perplexity=2^(-bound), to log at INFO level.
"""
if __debug__:
logger.debug("calculating perplexity")
if total_docs is None:
total_docs = len(chunk)
corpus_words = sum(cnt for document in chunk for _, cnt in document)
subsample_ratio = 1.0 * total_docs / len(chunk)
bound = self.bound(chunk, subsample_ratio=subsample_ratio)
word_cnt = (subsample_ratio * corpus_words)
perwordbound = bound / word_cnt
logger.info("%.3f per-word bound, %.1f perplexity estimate based on a held-out corpus of %i documents with %i words" %
(perwordbound, numpy.exp2(-perwordbound), len(chunk), corpus_words))
if __debug__:
logger.debug("bound %s, word_cnt %s, perword_bound %s", bound, word_cnt, perwordbound)
return perwordbound
def update(self, corpus, chunksize=None, decay=None, offset=None,
passes=None, update_every=None, eval_every=None, iterations=None,
gamma_threshold=None):
"""
Train the model with new documents, by EM-iterating over `corpus` until
the topics converge (or until the maximum number of allowed iterations
is reached). `corpus` must be an iterable (repeatable stream of documents),
In distributed mode, the E step is distributed over a cluster of machines.
This update also supports updating an already trained model (`self`)
with new documents from `corpus`; the two models are then merged in
proportion to the number of old vs. new documents. This feature is still
experimental for non-stationary input streams.
For stationary input (no topic drift in new documents), on the other hand,
this equals the online update of Hoffman et al. and is guaranteed to
converge for any `decay` in (0.5, 1.0>. Additionally, for smaller
`corpus` sizes, an increasing `offset` may be beneficial (see
Table 1 in Hoffman et al.)
"""
# use parameters given in constructor, unless user explicitly overrode them
if decay is None:
decay = self.decay
if offset is None:
offset = self.offset
if passes is None:
passes = self.passes
if update_every is None:
update_every = self.update_every
if eval_every is None:
eval_every = self.eval_every
if iterations is None:
iterations = self.iterations
if gamma_threshold is None:
gamma_threshold = self.gamma_threshold
try:
lencorpus = len(corpus)
except:
logger.warning("input corpus stream has no len(); counting documents")
lencorpus = sum(1 for _ in corpus)
if lencorpus == 0:
logger.warning("LdaModel.update() called with an empty corpus")
return
if chunksize is None:
chunksize = min(lencorpus, self.chunksize)
self.state.numdocs += lencorpus
if update_every:
updatetype = "online"
updateafter = min(lencorpus, update_every * self.numworkers * chunksize)
else:
updatetype = "batch"
updateafter = lencorpus
evalafter = min(lencorpus, (eval_every or 0) * self.numworkers * chunksize)
updates_per_pass = max(1, lencorpus / updateafter)
logger.info("running %s LDA training, %s topics, %i passes over "
"the supplied corpus of %i documents, updating model once "
"every %i documents, evaluating perplexity every %i documents, "
"iterating %ix with a convergence threshold of %f",
updatetype, self.num_topics, passes, lencorpus,
updateafter, evalafter, iterations,
gamma_threshold)
if updates_per_pass * passes < 10:
logger.warning("too few updates, training might not converge; consider "
"increasing the number of passes or iterations to improve accuracy")
# rho is the "speed" of updating; TODO try other fncs
# pass_ + num_updates handles increasing the starting t for each pass,
# while allowing it to "reset" on the first pass of each update
def rho():
if __debug__:
logger.debug("Rho: decay %s, offset %s, pass_%s, self.num_updates %s, chunksize %s", decay, offset, pass_, self.num_updates, chunksize )
return pow(offset + pass_ + (self.num_updates / chunksize), -decay)
for pass_ in xrange(passes):
if self.dispatcher:
logger.info('initializing %s workers' % self.numworkers)
self.dispatcher.reset(self.state)
else:
other = LdaState(self.eta, self.state.sstats.shape)
dirty = False
reallen = 0
for chunk_no, chunk in enumerate(utils.grouper(corpus, chunksize, as_numpy=True)):
reallen += len(chunk) # keep track of how many documents we've processed so far
if eval_every and ((reallen == lencorpus) or ((chunk_no + 1) % (eval_every * self.numworkers) == 0)):
self.log_perplexity(chunk, total_docs=lencorpus)
if self.dispatcher:
# add the chunk to dispatcher's job queue, so workers can munch on it
logger.info('PROGRESS: pass %i, dispatching documents up to #%i/%i',
pass_, chunk_no * chunksize + len(chunk), lencorpus)
# this will eventually block until some jobs finish, because the queue has a small finite length
self.dispatcher.putjob(chunk)
else:
logger.info('PROGRESS: pass %i, at document #%i/%i',
pass_, chunk_no * chunksize + len(chunk), lencorpus)
gammat = self.do_estep(chunk, other)
if self.optimize_alpha:
self.update_alpha(gammat, rho())
dirty = True
del chunk
# perform an M step. determine when based on update_every, don't do this after every chunk
if update_every and (chunk_no + 1) % (update_every * self.numworkers) == 0:
if self.dispatcher:
# distributed mode: wait for all workers to finish
logger.info("reached the end of input; now waiting for all remaining jobs to finish")
other = self.dispatcher.getstate()
self.do_mstep(rho(), other, pass_ > 0)
del other # frees up memory
if self.dispatcher:
logger.info('initializing workers')
self.dispatcher.reset(self.state)
else:
other = LdaState(self.eta, self.state.sstats.shape)
dirty = False
# endfor single corpus iteration
if reallen != lencorpus:
raise RuntimeError("input corpus size changed during training (don't use generators as input)")
if dirty:
# finish any remaining updates
if self.dispatcher:
# distributed mode: wait for all workers to finish
logger.info("reached the end of input; now waiting for all remaining jobs to finish")
other = self.dispatcher.getstate()
self.do_mstep(rho(), other, pass_ > 0)
del other
dirty = False
# endfor entire corpus update
def do_mstep(self, rho, other, extra_pass=False):
"""
M step: use linear interpolation between the existing topics and
collected sufficient statistics in `other` to update the topics.
"""
if __debug__:
logger.debug("Before M-step: components aka lambda %s", str(self.state.get_lambda()))
logger.debug("Before m step: expElogbeta %s", str(self.expElogbeta))
logger.debug("Before m step: topic_word_prior aka eta %s", str(self.eta))
logger.debug("Before m step: new sstats %s", other.sstats)
logger.debug("Before m step: rho %s", rho)
logger.debug("updating topics")
# update self with the new blend; also keep track of how much did
# the topics change through this update, to assess convergence
diff = numpy.log(self.expElogbeta)
self.state.blend(rho, other)
if __debug__:
logger.debug("After m step: sstats %s", self.state.sstats)
logger.debug("After M-step: components aka lambda %s", str(self.state.get_lambda()))
diff -= self.state.get_Elogbeta()
self.sync_state()
if __debug__:
logger.debug("After m step: expElogbeta %s", str(self.expElogbeta))
# print out some debug info at the end of each EM iteration
self.print_topics(5)
logger.info("topic diff=%f, rho=%f", numpy.mean(numpy.abs(diff)), rho)
if not extra_pass:
# only update if this isn't an additional pass
self.num_updates += other.numdocs
def bound(self, corpus, gamma=None, subsample_ratio=1.0):
"""
Estimate the variational bound of documents from `corpus`:
E_q[log p(corpus)] - E_q[log q(corpus)]
`gamma` are the variational parameters on topic weights for each `corpus`
document (=2d matrix=what comes out of `inference()`).
If not supplied, will be inferred from the model.
"""
if __debug__:
logger.debug("Estimating approx bound")
score = 0.0
_lambda = self.state.get_lambda()
if __debug__:
logger.debug("component aka lambda %s", _lambda)
Elogbeta = dirichlet_expectation(_lambda)
if __debug__:
logger.debug("dirichlet_component aka Elogbeta %s", Elogbeta)
for d, doc in enumerate(corpus): # stream the input doc-by-doc, in case it's too large to fit in RAM
if d % self.chunksize == 0:
logger.debug("bound: at document #%i", d)
if gamma is None:
gammad, _ = self.inference([doc])
else:
gammad = gamma[d]
Elogthetad = dirichlet_expectation(gammad)
# E[log p(doc | theta, beta)]
score += numpy.sum(cnt * logsumexp(Elogthetad + Elogbeta[:, id]) for id, cnt in doc)
# E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector
score += numpy.sum((self.alpha - gammad) * Elogthetad)
score += numpy.sum(gammaln(gammad) - gammaln(self.alpha))
score += gammaln(numpy.sum(self.alpha)) - gammaln(numpy.sum(gammad))
if __debug__:
logger.debug("Doc_id %s, score %s", d,score)
# compensate likelihood for when `corpus` above is only a sample of the whole corpus
score *= subsample_ratio
if __debug__:
logger.debug("Multiplied by %s subsample ratio: score is %s", subsample_ratio, score)
# E[log p(beta | eta) - log q (beta | lambda)]; assumes eta is a scalar
score += numpy.sum((self.eta - _lambda) * Elogbeta)
score += numpy.sum(gammaln(_lambda) - gammaln(self.eta))
if numpy.ndim(self.eta) == 0:
sum_eta = self.eta * self.num_terms
else:
sum_eta = numpy.sum(self.eta, 1)
score += numpy.sum(gammaln(sum_eta) - gammaln(numpy.sum(_lambda, 1)))
return score
def print_topics(self, num_topics=10, num_words=10):
return self.show_topics(num_topics, num_words, log=True)
def show_topics(self, num_topics=10, num_words=10, log=False, formatted=True):
"""
For `num_topics` number of topics, return `num_words` most significant words
(10 words per topic, by default).
The topics are returned as a list -- a list of strings if `formatted` is
True, or a list of (probability, word) 2-tuples if False.
If `log` is True, also output this result to log.
Unlike LSA, there is no natural ordering between the topics in LDA.
The returned `num_topics <= self.num_topics` subset of all topics is therefore
arbitrary and may change between two LDA training runs.
"""
if num_topics < 0 or num_topics >= self.num_topics:
num_topics = self.num_topics
chosen_topics = range(num_topics)
else:
num_topics = min(num_topics, self.num_topics)
# add a little random jitter, to randomize results around the same alpha
sort_alpha = self.alpha + 0.0001 * numpy.random.rand(len(self.alpha))
sorted_topics = list(matutils.argsort(sort_alpha))
chosen_topics = sorted_topics[:num_topics // 2] + sorted_topics[-num_topics // 2:]
shown = []
for i in chosen_topics:
if formatted:
topic = self.print_topic(i, topn=num_words)
else:
topic = self.show_topic(i, topn=num_words)
shown.append(topic)
if log:
logger.info("topic #%i (%.3f): %s", i, self.alpha[i], topic)
return shown
def show_topic(self, topicid, topn=10):
"""
Return a list of `(words_probability, word)` 2-tuples for the most probable
words in topic `topicid`.
Only return 2-tuples for the topn most probable words (ignore the rest).
"""
topic = self.state.get_lambda()[topicid]
topic = topic / topic.sum() # normalize to probability distribution
bestn = matutils.argsort(topic, topn, reverse=True)
beststr = [(topic[id], self.id2word[id]) for id in bestn]
return beststr
def print_topic(self, topicid, topn=10):
"""Return the result of `show_topic`, but formatted as a single string."""
return ' + '.join(['%.3f*%s' % v for v in self.show_topic(topicid, topn)])
def top_topics(self, corpus, num_words=20):
"""
Calculate the Umass topic coherence for each topic. Algorithm from
**Mimno, Wallach, Talley, Leenders, McCallum: Optimizing Semantic Coherence in Topic Models, CEMNLP 2011.**
"""
is_corpus, corpus = utils.is_corpus(corpus)
if not is_corpus:
logger.warning("LdaModel.top_topics() called with an empty corpus")
return
topics = []
str_topics = []
for topic in self.state.get_lambda():
topic = topic / topic.sum() # normalize to probability distribution
bestn = matutils.argsort(topic, topn=num_words, reverse=True)
topics.append(bestn)
beststr = [(topic[id], self.id2word[id]) for id in bestn]
str_topics.append(beststr)
# top_ids are limited to every topics top words. should not exceed the
# vocabulary size.
top_ids = set(chain.from_iterable(topics))
# create a document occurence sparse matrix for each word
doc_word_list = {}
for id in top_ids:
id_list = set()
for n, document in enumerate(corpus):
if id in frozenset(x[0] for x in document):
id_list.add(n)
doc_word_list[id] = id_list
coherence_scores = []
for t, top_words in enumerate(topics):
# Calculate each coherence score C(t, top_words)
coherence = 0.0
# Sum of top words m=2..M
for m in top_words[1:]:
# m_docs is v_m^(t)
m_docs = doc_word_list[m]
# Sum of top words l=1..m-1
# i.e., all words ranked higher than the current word m
for l in top_words[:m - 1]:
# l_docs is v_l^(t)
l_docs = doc_word_list[l]
# co_doc_frequency is D(v_m^(t), v_l^(t))
co_doc_frequency = len(m_docs.intersection(l_docs))
# add to the coherence sum for these two words m, l
coherence += numpy.log((co_doc_frequency + 1.0) / len(l_docs))
coherence_scores.append((str_topics[t], coherence))
top_topics = sorted(coherence_scores, key=lambda t: t[1], reverse=True)
return top_topics
def get_document_topics(self, bow, minimum_probability=None):
"""
Return topic distribution for the given document `bow`, as a list of
(topic_id, topic_probability) 2-tuples.
Ignore topics with very low probability (below `minimum_probability`).
"""
if minimum_probability is None:
minimum_probability = self.minimum_probability
# if the input vector is a corpus, return a transformed corpus
is_corpus, corpus = utils.is_corpus(bow)
if is_corpus:
return self._apply(corpus)
gamma, _ = self.inference([bow])
topic_dist = gamma[0] / sum(gamma[0]) # normalize distribution
return [(topicid, topicvalue) for topicid, topicvalue in enumerate(topic_dist)
if topicvalue >= minimum_probability]
def __getitem__(self, bow, eps=None):
"""
Return topic distribution for the given document `bow`, as a list of
(topic_id, topic_probability) 2-tuples.
Ignore topics with very low probability (below `eps`).
"""
return self.get_document_topics(bow, eps)
def save(self, fname, ignore=['state', 'dispatcher'], *args, **kwargs):
"""
Save the model to file.
Large internal arrays may be stored into separate files, with `fname` as prefix.
`separately` can be used to define which arrays should be stored in separate files.
`ignore` parameter can be used to define which variables should be ignored, i.e. left
out from the pickled lda model. By default the internal `state` is ignored as it uses
its own serialisation not the one provided by `LdaModel`. The `state` and `dispatcher
will be added to any ignore parameter defined.
Note: do not save as a compressed file if you intend to load the file back with `mmap`.
Note: If you intend to use models across Python 2/3 versions there are a few things to
keep in mind:
1. The pickled Python dictionaries will not work across Python versions
2. The `save` method does not automatically save all NumPy arrays using NumPy, only
those ones that exceed `sep_limit` set in `gensim.utils.SaveLoad.save`. The main
concern here is the `alpha` array if for instance using `alpha='auto'`.
Please refer to the wiki recipes section (https://github.com/piskvorky/gensim/wiki/Recipes-&-FAQ#q9-how-do-i-load-a-model-in-python-3-that-was-trained-and-saved-using-python-2)
for an example on how to work around these issues.
"""
if self.state is not None:
self.state.save(utils.smart_extension(fname, '.state'), *args, **kwargs)
# make sure 'state' and 'dispatcher' are ignored from the pickled object, even if
# someone sets the ignore list themselves
if ignore is not None and ignore:
if isinstance(ignore, six.string_types):
ignore = [ignore]
ignore = [e for e in ignore if e] # make sure None and '' are not in the list
ignore = list(set(['state', 'dispatcher']) | set(ignore))
else:
ignore = ['state', 'dispatcher']
super(LdaModel, self).save(fname, *args, ignore=ignore, **kwargs)
@classmethod
def load(cls, fname, *args, **kwargs):
"""
Load a previously saved object from file (also see `save`).
Large arrays can be memmap'ed back as read-only (shared memory) by setting `mmap='r'`:
>>> LdaModel.load(fname, mmap='r')
"""
kwargs['mmap'] = kwargs.get('mmap', None)
result = super(LdaModel, cls).load(fname, *args, **kwargs)
state_fname = utils.smart_extension(fname, '.state')
try:
result.state = super(LdaModel, cls).load(state_fname, *args, **kwargs)
except Exception as e:
logging.warning("failed to load state from %s: %s", state_fname, e)
return result
# endclass LdaModel
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Author: Jan Zikes, Radim Rehurek
# Copyright (C) 2014 Radim Rehurek <me@radimrehurek.com>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
"""
Latent Dirichlet Allocation (LDA) in Python, using all CPU cores to parallelize and
speed up model training.
The parallelization uses multiprocessing; in case this doesn't work for you for
some reason, try the :class:`gensim.models.ldamodel.LdaModel` class which is an
equivalent, but more straightforward and single-core implementation.
The training algorithm:
* is **streamed**: training documents may come in sequentially, no random access required,
* runs in **constant memory** w.r.t. the number of documents: size of the
training corpus does not affect memory footprint, can process corpora larger than RAM
Wall-clock `performance on the English Wikipedia <http://radimrehurek.com/gensim/wiki.html>`_
(2G corpus positions, 3.5M documents, 100K features, 0.54G non-zero entries in the final
bag-of-words matrix), requesting 100 topics:
====================================================== ==============
algorithm training time
====================================================== ==============
LdaMulticore(workers=1) 2h30m
LdaMulticore(workers=2) 1h24m
LdaMulticore(workers=3) 1h6m
old LdaModel() 3h44m
simply iterating over input corpus = I/O overhead 20m
====================================================== ==============
(Measured on `this i7 server <http://www.hetzner.de/en/hosting/produkte_rootserver/ex40ssd>`_
with 4 physical cores, so that optimal `workers=3`, one less than the number of cores.)
This module allows both LDA model estimation from a training corpus and inference of topic
distribution on new, unseen documents. The model can also be updated with new documents
for online training.
The core estimation code is based on the `onlineldavb.py` script by M. Hoffman [1]_, see
**Hoffman, Blei, Bach: Online Learning for Latent Dirichlet Allocation, NIPS 2010.**
.. [1] http://www.cs.princeton.edu/~mdhoffma
"""
import logging
from gensim import utils
from gensim.models.ldamodel import LdaModel, LdaState
from six.moves import queue, xrange
from multiprocessing import Pool, Queue, cpu_count
from numpy import exp
logger = logging.getLogger(__name__)
class LdaMulticore(LdaModel):
"""
The constructor estimates Latent Dirichlet Allocation model parameters based
on a training corpus:
>>> lda = LdaMulticore(corpus, num_topics=10)
You can then infer topic distributions on new, unseen documents, with
>>> doc_lda = lda[doc_bow]
The model can be updated (trained) with new documents via
>>> lda.update(other_corpus)
Model persistency is achieved through its `load`/`save` methods.
"""
def __init__(self, corpus=None, num_topics=100, id2word=None, workers=None,
chunksize=2000, passes=1, batch=False, alpha='symmetric',
eta=None, decay=0.5, offset=1.0, eval_every=10, iterations=50,
gamma_threshold=0.001, updateafter=None):
"""
If given, start training from the iterable `corpus` straight away. If not given,
the model is left untrained (presumably because you want to call `update()` manually).
`num_topics` is the number of requested latent topics to be extracted from
the training corpus.
`id2word` is a mapping from word ids (integers) to words (strings). It is
used to determine the vocabulary size, as well as for debugging and topic
printing.
`workers` is the number of extra processes to use for parallelization. Uses
all available cores by default: `workers=cpu_count()-1`. **Note**: for
hyper-threaded CPUs, `cpu_count()` returns a useless number -- set `workers`
directly to the number of your **real** cores (not hyperthreads) minus one,
for optimal performance.
If `batch` is not set, perform online training by updating the model once
every `workers * chunksize` documents (online training). Otherwise,
run batch LDA, updating model only once at the end of each full corpus pass.
`alpha` and `eta` are hyperparameters that affect sparsity of the document-topic
(theta) and topic-word (lambda) distributions. Both default to a symmetric
1.0/num_topics prior.
`alpha` can be set to an explicit array = prior of your choice. It also
support special values of 'asymmetric' and 'auto': the former uses a fixed
normalized asymmetric 1.0/topicno prior, the latter learns an asymmetric
prior directly from your data.
`eta` can be a scalar for a symmetric prior over topic/word
distributions, or a matrix of shape num_topics x num_words,
which can be used to impose asymmetric priors over the word
distribution on a per-topic basis. This may be useful if you
want to seed certain topics with particular words by boosting
the priors for those words.
Calculate and log perplexity estimate from the latest mini-batch once every
`eval_every` documents. Set to `None` to disable perplexity estimation (faster),
or to `0` to only evaluate perplexity once, at the end of each corpus pass.
`decay` and `offset` parameters are the same as Kappa and Tau_0 in
Hoffman et al, respectively.
Example:
>>> lda = LdaMulticore(corpus, id2word=id2word, num_topics=100) # train model
>>> print(lda[doc_bow]) # get topic probability distribution for a document
>>> lda.update(corpus2) # update the LDA model with additional documents
>>> print(lda[doc_bow])
"""
self.workers = max(1, cpu_count() - 1) if workers is None else workers
self.batch = batch
self.updateafter = updateafter
if alpha == 'auto':
raise NotImplementedError("auto-tuning alpha not implemented in multicore LDA; use plain LdaModel.")
super(LdaMulticore, self).__init__(corpus=corpus, num_topics=num_topics,
id2word=id2word, chunksize=chunksize, passes=passes, alpha=alpha, eta=eta,
decay=decay, offset=offset, eval_every=eval_every, iterations=iterations,
gamma_threshold=gamma_threshold)
def update(self, corpus):
"""
Train the model with new documents, by EM-iterating over `corpus` until
the topics converge (or until the maximum number of allowed iterations
is reached). `corpus` must be an iterable (repeatable stream of documents),
The E-step is distributed into the several processes.
This update also supports updating an already trained model (`self`)
with new documents from `corpus`; the two models are then merged in
proportion to the number of old vs. new documents. This feature is still
experimental for non-stationary input streams.
For stationary input (no topic drift in new documents), on the other hand,
this equals the online update of Hoffman et al. and is guaranteed to
converge for any `decay` in (0.5, 1.0>.
"""
try:
lencorpus = len(corpus)
except:
logger.warning("input corpus stream has no len(); counting documents")
lencorpus = sum(1 for _ in corpus)
if lencorpus == 0:
logger.warning("LdaMulticore.update() called with an empty corpus")
return
self.state.numdocs += lencorpus
if not self.batch:
updatetype = "online"
# !!! test by Lev, would be good to merge in to compare with sklearn
if self.updateafter is None:
self.updateafter = self.chunksize * self.workers
else:
updatetype = "batch"
self.updateafter = lencorpus
evalafter = min(lencorpus, (self.eval_every or 0) * self.updateafter)
updates_per_pass = max(1, lencorpus / self.updateafter)
logger.info("running %s LDA training, %s topics, %i passes over the"
" supplied corpus of %i documents, updating every %i documents,"
" evaluating every ~%i documents, iterating %ix with a convergence threshold of %f",
updatetype, self.num_topics, self.passes, lencorpus, self.updateafter, evalafter,
self.iterations, self.gamma_threshold)
if updates_per_pass * self.passes < 10:
logger.warning("too few updates, training might not converge; consider "
"increasing the number of passes or iterations to improve accuracy")
job_queue = Queue(maxsize=2 * self.workers)
result_queue = Queue()
# rho is the "speed" of updating; TODO try other fncs
# pass_ + num_updates handles increasing the starting t for each pass,
# while allowing it to "reset" on the first pass of each update
def rho():
if __debug__:
logger.debug("Rho: decay %s, offset %s, pass_%s, self.num_updates %s, chunksize %s", self.decay, self.offset, pass_, self.num_updates, self.chunksize )
return pow(self.offset + pass_ + (self.num_updates / self.chunksize), -self.decay)
logger.info("training LDA model using %i processes", self.workers)
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,))
for pass_ in xrange(self.passes):
queue_size, reallen = [0], 0
other = LdaState(self.eta, self.state.sstats.shape)
def process_result_queue(force=False):
"""
Clear the result queue, merging all intermediate results, and update the
LDA model if necessary.
"""
merged_new = False
while not result_queue.empty():
other.merge(result_queue.get())
queue_size[0] -= 1
merged_new = True
if __debug__:
if merged_new:
logger.debug("merged new result")
if (force and merged_new and queue_size[0] == 0) or (not self.batch and (other.numdocs >= self.updateafter)):
self.do_mstep(rho(), other, pass_ > 0)
other.reset()
if self.eval_every is not None and ((force and queue_size[0] == 0) or (self.eval_every != 0 and (self.num_updates / self.updateafter) % self.eval_every == 0)):
self.log_perplexity(chunk, total_docs=lencorpus)
chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=True)
for chunk_no, chunk in enumerate(chunk_stream):
reallen += len(chunk) # keep track of how many documents we've processed so far
# put the chunk into the workers' input job queue
chunk_put = False
while not chunk_put:
try:
job_queue.put((chunk_no, chunk, self), block=False, timeout=0.1)
chunk_put = True
queue_size[0] += 1
logger.info('PROGRESS: pass %i, dispatched chunk #%i = '
'documents up to #%i/%i, outstanding queue size %i',
pass_, chunk_no, chunk_no * self.chunksize + len(chunk), lencorpus, queue_size[0])
except queue.Full:
# in case the input job queue is full, keep clearing the
# result queue, to make sure we don't deadlock
process_result_queue()
process_result_queue()
#endfor single corpus pass
# wait for all outstanding jobs to finish
while queue_size[0] > 0:
process_result_queue(force=True)
if reallen != lencorpus:
raise RuntimeError("input corpus size changed during training (don't use generators as input)")
#endfor entire update
pool.terminate()
def worker_e_step(input_queue, result_queue):
"""
Perform E-step for each (chunk_no, chunk, model) 3-tuple from the
input queue, placing the resulting state into the result queue.
"""
logger.debug("worker process entering E-step loop")
while True:
logger.debug("getting a new job")
chunk_no, chunk, worker_lda = input_queue.get()
logger.debug("processing chunk #%i of %i documents", chunk_no, len(chunk))
worker_lda.state.reset()
worker_lda.do_estep(chunk) # TODO: auto-tune alpha?
del chunk
logger.debug("processed chunk, queuing the result")
result_queue.put(worker_lda.state)
if __debug__:
logger.debug("the result that just queued has ssstats %s", exp(worker_lda.state.sstats))
del worker_lda # free up some memory
logger.debug("result put")
"""
=============================================================
Online Latent Dirichlet Allocation with variational inference
=============================================================
This implementation is modified from Matthew D. Hoffman's onlineldavb code
Link: http://www.cs.princeton.edu/~mdhoffma/code/onlineldavb.tar
"""
# Author: Chyi-Kwei Yau
# Author: Matthew D. Hoffman (original onlineldavb implementation)
import logging
import numpy as np
import scipy.sparse as sp
from scipy.special import gammaln
from ..base import BaseEstimator, TransformerMixin
from ..utils import (check_random_state, check_array,
gen_batches, gen_even_slices, _get_n_jobs)
from ..utils.validation import NotFittedError, check_non_negative
from ..utils.extmath import logsumexp
from ..externals.joblib import Parallel, delayed
from ..externals.six.moves import xrange
from ._online_lda import (mean_change, _dirichlet_expectation_1d,
_dirichlet_expectation_2d)
EPS = np.finfo(np.float).eps
logger = logging.getLogger('sklearn.online_lda')
logger.setLevel(logging.DEBUG)
def _update_doc_distribution(X, exp_topic_word_distr, doc_topic_prior,
max_iters,
mean_change_tol, cal_sstats, random_state):
"""E-step: update document-topic distribution.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
exp_topic_word_distr : dense matrix, shape=(n_topics, n_features)
Exponential value of expection of log topic word distribution.
In the literature, this is `exp(E[log(beta)])`.
doc_topic_prior : float
Prior of document topic distribution `theta`.
max_iters : int
Max number of iterations for updating document topic distribution in
the E-step.
mean_change_tol : float
Stopping tolerance for updating document topic distribution in E-setp.
cal_sstats : boolean
Parameter that indicate to calculate sufficient statistics or not.
Set `cal_sstats` to `True` when we need to run M-step.
random_state : RandomState instance or None
Parameter that indicate how to initialize document topic distribution.
Set `random_state` to None will initialize document topic distribution
to a constant number.
Returns
-------
(doc_topic_distr, suff_stats) :
`doc_topic_distr` is unnormalized topic distribution for each document.
In the literature, this is `gamma`. we can calcuate `E[log(theta)]`
from it.
`suff_stats` is expected sufficient statistics for the M-step.
When `cal_sstats == False`, this will be None.
"""
is_sparse_x = sp.issparse(X)
n_samples, n_features = X.shape
n_topics = exp_topic_word_distr.shape[0]
if __debug__:
logger.debug("(n_samples = %s, n_topics =%s))", n_samples, n_topics)
if random_state:
doc_topic_distr = random_state.gamma(100., 0.01, (n_samples, n_topics))
else:
doc_topic_distr = np.ones((n_samples, n_topics))
if __debug__:
logger.debug("Initialised doc_topic_distr aka q(theta|gamma) %s", doc_topic_distr)
# In the literature, this is `exp(E[log(theta)])`
exp_doc_topic = np.exp(_dirichlet_expectation_2d(doc_topic_distr))
if __debug__:
logger.debug("Initialised exp_doc_topic aka Elogtheta %s", exp_doc_topic)
# diff on `component_` (only calculate it when `cal_diff` is True)
suff_stats = np.zeros(exp_topic_word_distr.shape) if cal_sstats else None
if __debug__:
logger.debug("Initialised suff_stats aka sstats %s", suff_stats)
if is_sparse_x:
X_data = X.data
X_indices = X.indices
X_indptr = X.indptr
for idx_d in xrange(n_samples):
if is_sparse_x:
ids = X_indices[X_indptr[idx_d]:X_indptr[idx_d + 1]]
cnts = X_data[X_indptr[idx_d]:X_indptr[idx_d + 1]]
else:
ids = np.nonzero(X[idx_d, :])[0]
cnts = X[idx_d, ids]
doc_topic_d = doc_topic_distr[idx_d, :]
exp_doc_topic_d = exp_doc_topic[idx_d, :]
exp_topic_word_d = exp_topic_word_distr[:, ids]
# Iterate between `doc_topic_d` and `norm_phi` until convergence
for _ in xrange(0, max_iters):
last_d = doc_topic_d
# The optimal phi_{dwk} is proportional to
# exp(E[log(theta_{dk})]) * exp(E[log(beta_{dw})]).
norm_phi = np.dot(exp_doc_topic_d, exp_topic_word_d) + EPS
if __debug__:
logger.debug("document %i, iteration %i, exp_doc_topic_d aka expElogthetad %s" , idx_d, _, exp_doc_topic_d)
logger.debug("document %i, iteration %i, exp_topic_word_d aka expElogbetad %s" , idx_d, _, exp_topic_word_d)
logger.debug("document %i, iteration %i, norm_phi aka phinorm %s" , idx_d, _, norm_phi)
doc_topic_d = (doc_topic_prior + exp_doc_topic_d *
np.dot(cnts / norm_phi, exp_topic_word_d.T))
if __debug__:
logger.debug("document %i, iteration %i, new doc_topic_d aka gammad %s", idx_d, _, str(doc_topic_d))
exp_doc_topic_d = _dirichlet_expectation_1d(doc_topic_d)
if __debug__:
logger.debug("document %i, iteration %i, new expElogthetad aka exp_doc_topic_d %s", idx_d, _, str(doc_topic_d))
if mean_change(last_d, doc_topic_d) < mean_change_tol:
break
doc_topic_distr[idx_d, :] = doc_topic_d
# Contribution of document d to the expected sufficient
# statistics for the M step.
if cal_sstats:
norm_phi = np.dot(exp_doc_topic_d, exp_topic_word_d) + EPS
if __debug__:
logger.debug("document %i, final, norm_phi aka phinorm %s" , idx_d, norm_phi)
logger.debug("document %i, final, norm_phi shape %s", idx_d, norm_phi.shape)
suff_stats[:, ids] += np.outer(exp_doc_topic_d.T, cnts / norm_phi)
if __debug__:
logger.debug("document %i, final, suff_stats aka sstats %s" , idx_d, str(suff_stats))
return (doc_topic_distr, suff_stats)
class LatentDirichletAllocation(BaseEstimator, TransformerMixin):
"""Latent Dirichlet Allocation with online variational Bayes algorithm
Parameters
----------
n_topics : int, optional (default=10)
Number of topics.
doc_topic_prior : float, optional (default=None)
Prior of document topic distribution `theta`. If the value is None,
defaults to `1 / n_topics`.
In the literature, this is called `alpha`.
topic_word_prior : float, optional (default=None)
Prior of topic word distribution `beta`. If the value is None, defaults
to `1 / n_topics`.
In the literature, this is called `eta`.
learning_method : 'batch' | 'online', default='online'
Method used to update `_component`. Only used in `fit` method.
In general, if the data size is large, the online update will be much
faster than the batch update.
Valid options::
'batch': Batch variational Bayes method. Use all training data in
each EM update.
Old `components_` will be overwritten in each iteration.
'online': Online variational Bayes method. In each EM update, use
mini-batch of training data to update the ``components_``
variable incrementally. The learning rate is controlled by the
``learning_decay`` and the ``learning_offset`` parameters.
learning_decay : float, optional (default=0.7)
It is a parameter that control learning rate in the online learning
method. The value should be set between (0.5, 1.0] to guarantee
asymptotic convergence. When the value is 0.0 and batch_size is
``n_samples``, the update method is same as batch learning. In the
literature, this is called kappa.
learning_offset : float, optional (default=10.)
A (positive) parameter that downweights early iterations in online
learning. It should be greater than 1.0. In the literature, this is
called tau_0.
max_iter : integer, optional (default=10)
The maximum number of iterations.
total_samples : int, optional (default=1e6)
Total number of documents. Only used in the `partial_fit` method.
batch_size : int, optional (default=128)
Number of documents to use in each EM iteration. Only used in online
learning.
evaluate_every : int optional (default=0)
How often to evaluate perplexity. Only used in `fit` method.
set it to 0 or and negative number to not evalute perplexity in
training at all. Evaluating perplexity can help you check convergence
in training process, but it will also increase total training time.
Evaluating perplexity in every iteration might increase training time
up to two-fold.
perp_tol : float, optional (default=1e-1)
Perplexity tolerance in batch learning. Only used when
``evaluate_every`` is greater than 0.
mean_change_tol : float, optional (default=1e-3)
Stopping tolerance for updating document topic distribution in E-step.
max_doc_update_iter : int (default=100)
Max number of iterations for updating document topic distribution in
the E-step.
n_jobs : int, optional (default=1)
The number of jobs to use in the E-step. If -1, all CPUs are used. For
``n_jobs`` below -1, (n_cpus + 1 + n_jobs) are used.
verbose : int, optional (default=0)
Verbosity level.
random_state : int or RandomState instance or None, optional (default=None)
Pseudo-random number generator seed control.
Attributes
----------
components_ : array, [n_topics, n_features]
Topic word distribution. ``components_[i, j]`` represents word j in
topic `i`. In the literature, this is called lambda.
n_batch_iter_ : int
Number of iterations of the EM step.
n_iter_ : int
Number of passes over the dataset.
References
----------
[1] "Online Learning for Latent Dirichlet Allocation", Matthew D. Hoffman,
David M. Blei, Francis Bach, 2010
[2] "Stochastic Variational Inference", Matthew D. Hoffman, David M. Blei,
Chong Wang, John Paisley, 2013
[3] Matthew D. Hoffman's onlineldavb code. Link:
http://www.cs.princeton.edu/~mdhoffma/code/onlineldavb.tar
"""
def __init__(self, n_topics=10, doc_topic_prior=None,
topic_word_prior=None, learning_method='online',
learning_decay=.7, learning_offset=10., max_iter=10,
batch_size=128, evaluate_every=-1, total_samples=1e6,
perp_tol=1e-1, mean_change_tol=1e-3, max_doc_update_iter=100,
n_jobs=1, verbose=0, random_state=None):
self.n_topics = n_topics
self.doc_topic_prior = doc_topic_prior
self.topic_word_prior = topic_word_prior
self.learning_method = learning_method
self.learning_decay = learning_decay
self.learning_offset = learning_offset
self.max_iter = max_iter
self.batch_size = batch_size
self.evaluate_every = evaluate_every
self.total_samples = total_samples
self.perp_tol = perp_tol
self.mean_change_tol = mean_change_tol
self.max_doc_update_iter = max_doc_update_iter
self.n_jobs = n_jobs
self.verbose = verbose
self.random_state = random_state
def _check_params(self):
"""Check model parameters."""
if self.n_topics <= 0:
raise ValueError("Invalid 'n_topics' parameter: %r"
% self.n_topics)
if self.total_samples <= 0:
raise ValueError("Invalid 'total_samples' parameter: %r"
% self.total_samples)
if self.learning_offset < 0:
raise ValueError("Invalid 'learning_offset' parameter: %r"
% self.learning_offset)
if self.learning_method not in ("batch", "online"):
raise ValueError("Invalid 'learning_method' parameter: %r"
% self.learning_method)
def _init_latent_vars(self, n_features):
"""Initialize latent variables."""
self.random_state_ = check_random_state(self.random_state)
self.n_batch_iter_ = 1
self.n_iter_ = 0
if self.doc_topic_prior is None:
self.doc_topic_prior_ = 1. / self.n_topics
else:
self.doc_topic_prior_ = self.doc_topic_prior
if self.topic_word_prior is None:
self.topic_word_prior_ = 1. / self.n_topics
else:
self.topic_word_prior_ = self.topic_word_prior
init_gamma = 100.
init_var = 1. / init_gamma
# In the literature, this is called `lambda`
self.components_ = self.random_state_.gamma(
init_gamma, init_var, (self.n_topics, n_features))
if __debug__:
logger.debug("Initial components aka lambda %s", str(self.components_))
# In the literature, this is `exp(E[log(beta)])`
self.exp_dirichlet_component_ = np.exp(
_dirichlet_expectation_2d(self.components_))
if __debug__:
logger.debug("Initial exp_dirichlet_component aka expElogbeta %s", str(self.exp_dirichlet_component_))
def _e_step(self, X, cal_sstats, random_init):
"""E-step in EM update.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
cal_sstats : boolean
Parameter that indicate whether to calculate sufficient statistics
or not. Set ``cal_sstats`` to True when we need to run M-step.
random_init : boolean
Parameter that indicate whether to initialize document topic
distribution randomly in the E-step. Set it to True in training
steps.
Returns
-------
(doc_topic_distr, suff_stats) :
`doc_topic_distr` is unnormailzed topic distribution for each
document. In the literature, this is called `gamma`.
`suff_stats` is expected sufficient statistics for the M-step.
When `cal_sstats == False`, it will be None.
"""
# Run e-step in parallel
n_jobs = _get_n_jobs(self.n_jobs)
random_state = self.random_state_ if random_init else None
results = Parallel(n_jobs=n_jobs, verbose=self.verbose)(
delayed(_update_doc_distribution)(X[idx_slice, :],
self.exp_dirichlet_component_,
self.doc_topic_prior_,
self.max_doc_update_iter,
self.mean_change_tol, cal_sstats,
random_state)
for idx_slice in gen_even_slices(X.shape[0], n_jobs))
# merge result
doc_topics, sstats_list = zip(*results)
doc_topic_distr = np.vstack(doc_topics)
if cal_sstats:
# This step finishes computing the sufficient statistics for the
# M-step.
suff_stats = np.zeros(self.components_.shape)
for sstats in sstats_list:
suff_stats += sstats
suff_stats *= self.exp_dirichlet_component_
if __debug__:
logger.debug("all documents, (suff_stats aka sstats) multiplied by expElogbeta %s", str(suff_stats))
else:
suff_stats = None
return (doc_topic_distr, suff_stats)
def _em_step(self, X, total_samples, batch_update):
"""EM update for 1 iteration.
update `_component` by batch VB or online VB.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
total_samples : integer
Total umber of documents. It is only used when
batch_update is `False`.
batch_update : boolean
Parameter that controls updating method.
`True` for batch learning, `False` for online learning.
Returns
-------
doc_topic_distr : array, shape=(n_samples, n_topics)
Unnormalized document topic distribution.
"""
# E-step
_, suff_stats = self._e_step(X, cal_sstats=True, random_init=True)
if __debug__:
logger.debug("Before M-step: components aka lambda %s", str(self.components_))
logger.debug("Before m step: expElogbeta %s", str(self.exp_dirichlet_component_))
logger.debug("Before m step: topic_word_prior aka eta %s", str(self.topic_word_prior_))
logger.debug("Before m step: new sstats %s", suff_stats)
# M-step
if batch_update:
self.components_ = self.topic_word_prior_ + suff_stats
if __debug__:
logger.debug("After M-step: components aka lambda %s", str(self.components_))
else:
# online update
# In the literature, the weight is `rho`
if __debug__:
logger.debug("Rho: decay %s, offset %s, self.batch_iter %s", self.learning_decay, self.learning_offset, self.n_batch_iter_ )
weight = np.power(self.learning_offset + self.n_batch_iter_,
-self.learning_decay)
if __debug__:
logger.debug("Before m step: rho %s", weight)
doc_ratio = float(total_samples) / X.shape[0]
self.components_ *= (1 - weight)
self.components_ += (weight * (self.topic_word_prior_
+ doc_ratio * suff_stats))
if __debug__:
logger.debug("After M-step: components aka lambda %s", self.components_)
# update `component_` related variables
self.exp_dirichlet_component_ = np.exp(
_dirichlet_expectation_2d(self.components_))
if __debug__:
logger.debug("After m step: expElogbeta %s", str(self.exp_dirichlet_component_))
self.n_batch_iter_ += 1
return
def _check_non_neg_array(self, X, whom):
"""check X format
check X format and make sure no negative value in X.
Parameters
----------
X : array-like or sparse matrix
"""
X = check_array(X, accept_sparse='csr')
check_non_negative(X, whom)
return X
def partial_fit(self, X, y=None):
"""Online VB with Mini-Batch update.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
Returns
-------
self
"""
self._check_params()
X = self._check_non_neg_array(X,
"LatentDirichletAllocation.partial_fit")
n_samples, n_features = X.shape
batch_size = self.batch_size
# initialize parameters or check
if not hasattr(self, 'components_'):
self._init_latent_vars(n_features)
if n_features != self.components_.shape[1]:
raise ValueError(
"The provided data has %d dimensions while "
"the model was trained with feature size %d." %
(n_features, self.components_.shape[1]))
for idx_slice in gen_batches(n_samples, batch_size):
self._em_step(X[idx_slice, :], total_samples=self.total_samples,
batch_update=False)
return self
def fit(self, X, y=None):
"""Learn model for the data X with variational Bayes method.
When `learning_method` is 'online', use mini-batch update.
Otherwise, use batch update.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
Returns
-------
self
"""
self._check_params()
X = self._check_non_neg_array(X, "LatentDirichletAllocation.fit")
n_samples, n_features = X.shape
max_iter = self.max_iter
evaluate_every = self.evaluate_every
learning_method = self.learning_method
batch_size = self.batch_size
# initialize parameters
self._init_latent_vars(n_features)
# change to perplexity later
last_bound = None
for i in xrange(max_iter):
if learning_method == 'online':
for idx_slice in gen_batches(n_samples, batch_size):
self._em_step(X[idx_slice, :], total_samples=n_samples,
batch_update=False)
else:
# batch update
self._em_step(X, total_samples=n_samples, batch_update=True)
# check perplexity
if evaluate_every > 0 and (i + 1) % evaluate_every == 0:
if __debug__:
logger.debug("Checking perplexity from fit")
doc_topics_distr, _ = self._e_step(X, cal_sstats=False,
random_init=False)
bound = self.perplexity(X, doc_topics_distr,
sub_sampling=False)
if self.verbose:
print('iteration: %d, perplexity: %.4f' % (i + 1, bound))
if last_bound and abs(last_bound - bound) < self.perp_tol:
break
last_bound = bound
self.n_iter_ += 1
return self
def transform(self, X):
"""Transform data X according to the fitted model.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
Returns
-------
doc_topic_distr : shape=(n_samples, n_topics)
Document topic distribution for X.
"""
if __debug__:
logger.debug("Transforming X according to the fitted model")
if not hasattr(self, 'components_'):
raise NotFittedError("no 'components_' attribute in model."
" Please fit model first.")
# make sure feature size is the same in fitted model and in X
X = self._check_non_neg_array(X, "LatentDirichletAllocation.transform")
n_samples, n_features = X.shape
if n_features != self.components_.shape[1]:
raise ValueError(
"The provided data has %d dimensions while "
"the model was trained with feature size %d." %
(n_features, self.components_.shape[1]))
doc_topic_distr, _ = self._e_step(X, cal_sstats=False,
random_init=False)
return doc_topic_distr
def _approx_bound(self, X, doc_topic_distr, sub_sampling):
"""Estimate the variational bound.
Estimate the variational bound over "all documents" using only the
documents passed in as X. Since log-likelihood of each word cannot
be computed directly, we use this bound to estimate it.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
doc_topic_distr : array, shape=(n_samples, n_topics)
Document topic distribution. In the literature, this is called
gamma.
sub_sampling : boolean, optional, (default=False)
Compensate for subsampling of documents.
It is used in calcuate bound in online learning.
Returns
-------
score : float
"""
if __debug__:
logger.debug("Estimating approx bound")
def _loglikelihood(prior, distr, dirichlet_distr, size):
# calculate log-likelihood
score = np.sum((prior - distr) * dirichlet_distr)
score += np.sum(gammaln(distr) - gammaln(prior))
score += np.sum(gammaln(prior * size) - gammaln(np.sum(distr, 1)))
return score
is_sparse_x = sp.issparse(X)
n_samples, n_topics = doc_topic_distr.shape
n_features = self.components_.shape[1]
score = 0
dirichlet_doc_topic = _dirichlet_expectation_2d(doc_topic_distr)
if __debug__:
logger.debug("Initialised dirichlet_doc_topic aka Elogtheta %s", dirichlet_doc_topic)
dirichlet_component_ = _dirichlet_expectation_2d(self.components_)
if __debug__:
logger.debug("component aka lambda %s", self.components_)
if __debug__:
logger.debug("dirichlet_component aka Elogbeta %s", dirichlet_component_)
doc_topic_prior = self.doc_topic_prior_
topic_word_prior = self.topic_word_prior_
if is_sparse_x:
X_data = X.data
X_indices = X.indices
X_indptr = X.indptr
# E[log p(docs | theta, beta)]
for idx_d in xrange(0, n_samples):
if is_sparse_x:
ids = X_indices[X_indptr[idx_d]:X_indptr[idx_d + 1]]
cnts = X_data[X_indptr[idx_d]:X_indptr[idx_d + 1]]
else:
ids = np.nonzero(X[idx_d, :])[0]
cnts = X[idx_d, ids]
temp = (dirichlet_doc_topic[idx_d, :, np.newaxis]
+ dirichlet_component_[:, ids])
norm_phi = logsumexp(temp)
score += np.dot(cnts, norm_phi)
if __debug__:
logger.debug("Doc_id %s, score %s", idx_d,score)
# compute E[log p(theta | alpha) - log q(theta | gamma)]
score += _loglikelihood(doc_topic_prior, doc_topic_distr,
dirichlet_doc_topic, self.n_topics)
if __debug__:
logger.debug("Added E[log p(theta | alpha) - log q(theta | gamma)]=%s", score)
# Compensate for the subsampling of the population of documents
if sub_sampling:
doc_ratio = float(self.total_samples) / n_samples
score *= doc_ratio
if __debug__:
logger.debug("Multiplied by %s subsample ratio", doc_ratio, score)
# E[log p(beta | eta) - log q (beta | lambda)]
score += _loglikelihood(topic_word_prior, self.components_,
dirichlet_component_, n_features)
return score
def score(self, X, y=None):
"""Calculate approximate log-likelihood as score.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Document word matrix.
Returns
-------
score : float
Use approximate bound as score.
"""
X = self._check_non_neg_array(X, "LatentDirichletAllocation.score")
doc_topic_distr = self.transform(X)
score = self._approx_bound(X, doc_topic_distr, sub_sampling=False)
return score
def perplexity(self, X, doc_topic_distr=None, sub_sampling=False):
"""Calculate approximate perplexity for data X.
Perplexity is defined as exp(-1. * log-likelihood per word)
Parameters
----------
X : array-like or sparse matrix, [n_samples, n_features]
Document word matrix.
doc_topic_distr : None or array, shape=(n_samples, n_topics)
Document topic distribution.
If it is None, it will be generated by applying transform on X.
Returns
-------
score : float
Perplexity score.
"""
if __debug__:
logger.debug("calculating perplexity")
if not hasattr(self, 'components_'):
raise NotFittedError("no 'components_' attribute in model."
" Please fit model first.")
X = self._check_non_neg_array(X,
"LatentDirichletAllocation.perplexity")
if doc_topic_distr is None:
if __debug__:
logger.debug("Initialising doc topic distr by calling transform")
doc_topic_distr = self.transform(X)
else:
n_samples, n_topics = doc_topic_distr.shape
if n_samples != X.shape[0]:
raise ValueError("Number of samples in X and doc_topic_distr"
" do not match.")
if n_topics != self.n_topics:
raise ValueError("Number of topics does not match.")
current_samples = X.shape[0]
bound = self._approx_bound(X, doc_topic_distr, sub_sampling)
if sub_sampling:
word_cnt = X.sum() * (float(self.total_samples) / current_samples)
else:
word_cnt = X.sum()
perword_bound = bound / word_cnt
if __debug__:
logger.debug("bound %s, word_cnt %s, perword_bound %s", bound, word_cnt, perword_bound)
return np.exp(-1.0 * perword_bound)
#
# # !!! checking if numerical diff is due to python
# # from ._online_lda import (mean_change, _dirichlet_expectation_1d,
# # _dirichlet_expectation_2d)
#
# from scipy.special import gammaln, psi # gamma function utils
# from scipy.special import polygamma
#
# def _dirichlet_expectation_1d(alpha):
# """
# For a vector `theta~Dir(alpha)`, compute `E[log(theta)]`.
#
# """
# if (len(alpha.shape) == 1):
# result = psi(alpha) - psi(np.sum(alpha))
# else:
# result = psi(alpha) - psi(np.sum(alpha, 1))[:, np.newaxis]
# return (np.exp(result)).astype(alpha.dtype) # keep the same precision as input
#
# def _dirichlet_expectation_2d(alpha):
# """
# For a vector `theta~Dir(alpha)`, compute `E[log(theta)]`.
#
# """
# if (len(alpha.shape) == 1):
# result = psi(alpha) - psi(np.sum(alpha))
# else:
# result = psi(alpha) - psi(np.sum(alpha, 1))[:, np.newaxis]
# return result.astype(alpha.dtype) # keep the same precision as input
#
# def mean_change(last_d, doc_topic_d):
# return np.mean(abs(last_d - doc_topic_d))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment