Skip to content

Instantly share code, notes, and snippets.

@kylebgorman
Created July 26, 2011 02:24
Show Gist options
  • Save kylebgorman/1105800 to your computer and use it in GitHub Desktop.
Save kylebgorman/1105800 to your computer and use it in GitHub Desktop.
Classes for building and sampling from probability distributions; Constantine Lignos tells me this is a variation on the "Shannon-Miller-Selfridge" algorithm which does the summing once and uses bisection each time (as opposed to summing every sample). I
#!/usr/bin/env python
# ProbDist.py: Two classes for probability distributions and sampling.
# Kyle Gorman <kgorman@ling.upenn.edu>
from math import fsum
from bisect import bisect
from random import random
from collections import defaultdict
class MLProbDist(object):
"""
A class representing a maximum likelihood estimate probability
distribution to be used for sampling
"""
def __init__(self, item2prob=None):
"""
If an object with a __getitem__ method is passed, it is used to
initialize a the frequency distribution: the keys are the objects and
the numerical values returned by item2prob[key] are the counts or
probabilities. It can also be populated using the increment() class
method.
>>> prob = {'b': .4, 'c': .4, 'a': .1, 'd': .1}
>>> p = MLProbDist(prob)
>>> print all(i in prob.keys() for i in p.sample(3))
True
>>> for key in prob:
... prob[key] *= 1000
>>> p = MLProbDist(prob)
>>> print all(i in prob.keys() for i in p.sample(3))
True
"""
self.prob = False
if item2prob:
self.item2freq = item2prob
self.freq2prob()
else:
self.item2freq = defaultdict(int)
def __str__(self):
return '<MLProbDist with %d items>' % len(self.item2freq)
def __iter__(self):
return iter(self.item2freq)
def __getitem__(self, item):
"""
Returns freq or prob of item
"""
return self.item2freq[item]
def increment(self, item, count=1):
assert not self.prob, 'already probabilitized...sorry'
self.item2freq[item] += count
def update(self, item_list):
"""
Increment a number of items
"""
assert not self.prob, 'already probabilitized...sorry'
for item in item_list:
self.increment(item)
def _probs(self, prob2items):
"""
Construct probability distribution...not for users.
"""
self.probs = []
adjuster = 0.
for p in prob2items:
for item in prob2items[p]:
pa = p + adjuster
self.probs.append(pa)
adjuster = pa
self.items = sorted(self.item2freq.iterkeys(), key=self.item2freq.get,
reverse=True)
# note that this depends on the STABLE properties of sorted().
self.prob = True
def freq2prob(self):
"""
Make it a true probability distribution, if it isn't already.
"""
assert not self.prob, 'already probabilitized...sorry'
# normalize
norm = fsum(self.item2freq.values())
prob2items = defaultdict(list)
for item in self.item2freq:
p = self.item2freq[item] / norm # compute normalized prob
self.item2freq[item] = p # write it in
prob2items[p].append(item)
# generate the distribution for sampling
self._probs(prob2items)
self.prob = True
def choose(self):
"""
Return one random draw from the distribution
"""
assert self.prob, 'not yet probabilized: run freq2prob()'
return self.items[bisect(self.probs, random())]
def sample(self, n):
"""
Lazily eeturn n random draws from the distribution
"""
assert self.prob, 'not yet probabilized: run freq2prob()'
for i in xrange(n):
yield self.items[bisect(self.probs, random())]
class WittenBellProbDist(MLProbDist):
"""
A class representing a Witten-Bell probability distribution to be used for
sampling
>>> p = WittenBellProbDist()
>>> p.update('a a a a a b b b b c a a a a b b c c'.split())
>>> p.increment('d', 0)
>>> p.increment('e', 0)
>>> print p
<WittenBellProbDist with 5 items>
>>> p.freq2prob()
>>> print round(p['d'], 3) # which has never been seen before...
0.071
"""
def __str__(self):
return '<WittenBellProbDist with %d items>' % len(self.item2freq)
def __iter__(self):
return iter(self.item2freq)
def freq2prob(self):
"""
Make it a true probability distribution, if it isn'y already.
"""
assert not self.prob, 'already probabilitized...sorry'
# compute the unseen_p estimate
Z = 0
N = fsum(self.item2freq.values())
T = float(len([i for i in self.item2freq.values() if i > 0.]))
NT = N + T
for item in self.item2freq: # get count of unseens
if self.item2freq[item] == 0.:
Z += 1
unseen_p = T / (Z * NT)
# normalize
prob2items = defaultdict(list)
for item in self.item2freq: # normalize
p = self.item2freq[item]
if p == 0.:
self.item2freq[item] = unseen_p
prob2items[unseen_p].append(item)
else:
p = self.item2freq[item] / NT
self.item2freq[item] = p
prob2items[p].append(item)
# generate the distribution for sampling
self._probs(prob2items)
self.prob = True
## some testing code
if __name__ == '__main__':
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment