Skip to content

Instantly share code, notes, and snippets.

@amintos
Created April 29, 2015 01:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save amintos/2b667bae7f709112fe44 to your computer and use it in GitHub Desktop.
Save amintos/2b667bae7f709112fe44 to your computer and use it in GitHub Desktop.
LDA with Gibbs Sampling in Python
import random
def dirichlet(alpha):
"""Sample dirichlet-distributed vector from Dir(alpha)"""
s = [random.gammavariate(a, 1) for a in alpha]
norm = sum(s)
return [d / norm for d in s]
def multinomial(p):
"""Sample multinomial-distributed index from Mul(p)"""
r = random.random()
for i in range(len(p)):
r = r - p[i]
if r < 0:
return i
return len(p) - 1
def normalize(v):
"""L1-Normalize to obtain a vector of probabilities"""
norm = sum(v)
return [float(i) / norm for i in v]
class LDA(object):
def __init__(self, topics=None, topic_priors=None):
self.topics = topics or []
self.n_topics = len(topics) if topics else 0
self.n_features = len(topics[0]) if topics else 0
self.topic_priors = topic_priors or []
def sample(self, length):
"""Sample document of given length from the model"""
doc = [0] * self.n_features
# sample topic mixture for this document
topic_dist = dirichlet(self.topic_priors)
for i in xrange(length):
# draw topic from mixture
topic_id = multinomial(topic_dist)
# draw word from topic
feature_id = multinomial(self.topics[topic_id])
doc[feature_id] += 1
return doc
def associate(self, doc_id, topic_id, feature_id, by=1):
"""Assume certain document receives a given feature from a topic"""
self.doc_topics[doc_id][topic_id] += by
self.topic_features[topic_id][feature_id] += by
self.doc_freqs[doc_id] += by
self.topic_freqs[topic_id] += by
def set_priors(self, docs, n_features, n_topics,
feature_priors=None, topic_priors=None):
"""Initialize document-topic-feature assigmnents"""
self.n_topics = n_topics
self.n_docs = n_docs = len(docs)
self.n_features = n_features
self.doc_topics = [[0] * n_topics for _ in xrange(n_docs)]
self.doc_freqs = [0] * n_docs
self.topic_features = [[0] * n_features for _ in xrange(n_topics)]
self.topic_freqs = [0] * n_topics
self.doc_position_topics = {}
self.topic_priors = topic_priors or [1. / n_topics] * n_topics
self.feature_priors = feature_priors or [1. / n_features] * n_features
for doc_id, doc in enumerate(docs):
position = 0
for feature_id, n in doc:
for i in range(n):
topic_id = random.randint(0, self.n_topics - 1)
self.associate(doc_id, topic_id, feature_id)
self.doc_position_topics[(doc_id, position)] = topic_id
position += 1
def update(self, docs):
for doc_id, doc in enumerate(docs):
position = 0
for feature_id, n in doc:
for i in range(n):
topic_id = self.doc_position_topics[(doc_id, position)]
self.associate(doc_id, topic_id, feature_id, by=-1)
dist = self.topic_dist(doc_id, feature_id)
topic_id = multinomial(dist)
self.associate(doc_id, topic_id, feature_id)
self.doc_position_topics[(doc_id, position)] = topic_id
position += 1
def topic_dist(self, doc_id, feature_id):
"""Distribution of potential topics explaining feature in document"""
feature_prior = self.feature_priors[feature_id]
topics = self.doc_topics[doc_id]
doc_freq = self.doc_freqs[doc_id]
return normalize([
(self.topic_features[i][feature_id] + feature_prior)
/ (self.topic_freqs[i] + 1.0) *
(topics[i] + self.topic_priors[i])
/ (doc_freq + self.topic_priors[i] * self.n_topics)
for i in xrange(self.n_topics)])
def explain(self, docs, n_features, n_topics, steps, feature_priors=None, topic_priors=None):
"""Explain documents in terms of topics"""
self.set_priors(docs, n_features, n_topics, feature_priors, topic_priors)
for i in xrange(steps):
self.update(docs)
self.topics = map(normalize, self.topic_features)
# -----------------------------------------------------------------------------
# TESTING
# -----------------------------------------------------------------------------
if __name__ == '__main__':
def test_topics(n=5):
"""Generate 2*n topics of n*n words"""
topics = []
for i in xrange(n):
v = [1.0 / n if j % n == i else 0.0 for j in xrange(n*n)]
h = [1.0 / n if j / n == i else 0.0 for j in xrange(n*n)]
topics.append(v)
topics.append(h)
return topics
def print_topic(t):
n = int(len(t) ** 0.5)
assert n * n == len(t)
for i in xrange(n):
for j in xrange(n):
print '%0.2f' % t[i * n + j],
print
def sparsify(vector):
"""Return (index, value)-list of a sparse vector"""
return [(i, v) for i, v in enumerate(vector) if v != 0]
# for t in test_topics(5):
# print '---'
# print_topic(t)
lda1 = LDA(test_topics(5), topic_priors=[0.2] * 10)
docs = [sparsify(lda1.sample(100)) for i in xrange(500)]
lda2 = LDA()
lda2.explain(docs, 25, 10, 100)
for i in xrange(10):
print_topic(lda2.topics[i])
print '-' * 30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment