Skip to content

Instantly share code, notes, and snippets.

Created October 6, 2013 17:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save anonymous/6856835 to your computer and use it in GitHub Desktop.
Save anonymous/6856835 to your computer and use it in GitHub Desktop.
import numpy as np
import itertools as it
def normalize(x):
return x / np.sum(x)
def normalizerow(x):
return (x.T / np.sum(x, axis=1)).T
def normalizecol(x):
return x / np.sum(x, axis=0)
def almost_equal(x, y, tol=1e-3):
return np.fabs(x - y) <= tol
def almost_equal_array(x, y, tol=1e-3):
return (np.fabs(x - y) <= tol).all()
def randompath(rnd, X, T):
return rnd.random_integers(low=0, high=X-1, size=T)
def randomstate(rnd, Z):
return rnd.random_integers(low=0, high=Z-1)
def randomobs(rnd, X):
return rnd.random_integers(low=0, high=X-1)
def randompos(rnd, T):
return rnd.random_integers(low=0, high=T-1)
def assert_almost_equal(a, b):
if almost_equal(a, b):
print "[PASSED] a=%f, b=%f" % (a, b)
else:
print "[FAILED] a=%f, b=%f" % (a, b)
assert False
def assert_almost_equal_array(a, b):
if almost_equal_array(a, b):
print "[PASSED] a=%s, b=%s" % (str(a), str(b))
else:
print "[FAILED] a=%s, b=%s" % (str(a), str(b))
assert False
class MultinomialHMM(object):
def __init__(self, Z, X, T):
self._Z, self._X, self._T = Z, X, T
def _forward(self, Pi, A, B, X):
"""
Returns all forward computations of alpha, with model H = (Pi, A, B),
for a single observation sequence X
shape: (Z, T)
"""
alphas = np.zeros((self._Z, self._T))
for i in xrange(self._Z):
alphas[i, 0] = Pi[i] * B[X[0], i]
for t in xrange(1, self._T):
for i in xrange(self._Z):
alphas[i, t] = np.sum((alphas[z, t - 1] * A[z, i] * B[X[t], i] for z in xrange(self._Z)))
return alphas
def _backward(self, Pi, A, B, X):
"""
Returns all backward computations of beta, with model H = (Pi, A, B),
for a single observation sequence X
shape: (Z, T)
"""
betas = np.zeros((self._Z, self._T))
for i in xrange(self._Z):
betas[i, self._T - 1] = 1.0
for t in xrange(self._T - 2, -1, -1):
for i in xrange(self._Z):
betas[i, t] = np.sum((betas[z, t + 1] * A[i, z] * B[X[t + 1], z] for z in xrange(self._Z)))
return betas
def argmax_over_obs_bruteforce(self, fn):
"""returns (argmax, value)"""
best, best_value = None, None
for obs in it.product(*[range(self._X) for _ in xrange(self._T)]):
v = fn(obs)
if not best_value or v > best_value:
best, best_value = obs, v
return best, best_value
def argmax_obs_given_latent_bruteforce(self, z, t):
"""returns argmax_{X_{1:T}} P(X_{1:T} | z_t=z)"""
return self.argmax_over_obs_bruteforce(lambda X: self.prob_obs_given_latent(X, z, t))
def argmax_latent_given_obs_bruteforce(self, z, t):
"""returns argmax_{X_{1:T}} P(z_t=z | X_{1:T})"""
return self.argmax_over_obs_bruteforce(lambda X: self.prob_latent_given_obs(X, z, t))
def prob_latent_given_obs(self, X, z, t):
"""returns P(z_t=z | X)"""
return self._prob_latent_given_obs(X, z, t)[0]
def _prob_latent_given_obs(self, X, z, t):
"""returns (P(z_t=z | X), P(X))"""
alphas = self._forward(self._Pi, self._A, self._B, X)
betas = self._backward(self._Pi, self._A, self._B, X)
denom = np.sum(alphas[:, self._T - 1])
return alphas[z, t] * betas[z, t] / denom, denom
def prob_latent_given_obs_bruteforce(self, X, z, t):
"""returns P(z_t=z | X)"""
return self._prob_latent_given_obs_bruteforce(X, z, t)[0]
def _prob_latent_given_obs_bruteforce(self, X, z, t):
"""returns (P(z_t=z | X), P(X))"""
# compute joint P(z_t=z, X) for each value of z
joints = np.zeros(self._Z)
for zv in xrange(self._Z):
zargs = [range(self._Z) for _ in xrange(t)] + \
[[zv]] + \
[range(self._Z) for _ in xrange(t+1, self._T)]
for zdata in it.product(*zargs):
joints[zv] += self.prob_joint(X, zdata)
px = joints.sum()
return joints[z] / px, px
def prob_obs_given_latent(self, X, z, t):
"""returns P(X | z_t=z)"""
p_latent_given_obs, p_obs = self._prob_latent_given_obs(X, z, t)
p_latent = self.prob_latent(t)
return p_latent_given_obs * p_obs / p_latent[z]
def prob_obs_given_latent_bruteforce(self, X, z, t):
"""returns P(X | z_t=z)"""
top = 0.0
zargs = [range(self._Z) for _ in xrange(t)] + \
[[z]] + \
[range(self._Z) for _ in xrange(t+1, self._T)]
for zdata in it.product(*zargs):
top += self.prob_joint(X, zdata)
bot = 0.0
xargs = [range(self._X) for _ in xrange(self._T)]
zargs = [range(self._Z) for _ in xrange(t)] + \
[[z]] + \
[range(self._Z) for _ in xrange(t+1, self._T)]
args = xargs + zargs
for data in it.product(*args):
thisX = np.array(data[:self._T])
Z = np.array(data[self._T:])
bot += self.prob_joint(thisX, Z)
return top / bot
def prob_latent(self, t):
"""returns [P(z_t=1), ..., P(z_t=N)]"""
assert t >= 0 and t < self._T
v_before = np.array(self._Pi)
for _ in xrange(1, t + 1):
v_before_next = np.zeros(self._Z)
for z_cur in xrange(self._Z):
v_before_next[z_cur] = (self._A[:, z_cur] * v_before).sum()
v_before = v_before_next
v_after = np.ones(self._Z)
for _ in xrange(self._T - 1, t, -1):
v_after_next = np.zeros(self._Z)
for z_cur in xrange(self._Z):
v_after_next[z_cur] = (self._A[z_cur, :] * v_after).sum()
v_after = v_after_next
return v_before * v_after
def prob_latent_bruteforce(self, t):
"""returns [P(z_t=1), ..., P(z_t=N)]"""
assert t >= 0 and t < self._T
ret = np.zeros(self._Z)
for z in xrange(self._Z):
xargs = [range(self._X) for _ in xrange(self._T)]
zargs = [range(self._Z) for _ in xrange(t)] + [[z]] + [range(self._Z) for _ in xrange(t+1, self._T)]
args = xargs + zargs
for data in it.product(*args):
X = np.array(data[:self._T])
Z = np.array(data[self._T:])
ret[z] += self.prob_joint(X, Z)
return ret
def prob_joint(self, X, Z):
res = self._Pi[Z[0]] * self._B[X[0], Z[0]]
for t in xrange(1, self._T):
res *= self._A[Z[t-1], Z[t]] * self._B[X[t], Z[t]]
return res
def test(Z, X, T, seed=None):
rnd = np.random.RandomState(seed) if seed is not None else np.random
Pi = normalize(rnd.uniform(low=0.1, high=1.0, size=Z))
A = normalizerow(rnd.uniform(low=0.1, high=1.0, size=(Z, Z)))
B = normalizecol(rnd.uniform(low=0.1, high=1.0, size=(X, Z)))
h = MultinomialHMM(Z, X, T)
h._Pi, h._A, h._B = Pi, A, B
# Test P(X | z_t=z)
path, z, t = randompath(rnd, X, T), \
randomstate(rnd, Z), \
randompos(rnd, T)
a = h.prob_obs_given_latent(path, z, t)
b = h.prob_obs_given_latent_bruteforce(path, z, t)
assert_almost_equal(a, b)
# Test P(z_t=z | X)
path, z, t = randompath(rnd, X, T), \
randomstate(rnd, Z), \
randompos(rnd, T)
a = h.prob_latent_given_obs(path, z, t)
b = h.prob_latent_given_obs_bruteforce(path, z, t)
assert_almost_equal(a, b)
# Test P(z_t=z)
z = randomstate(rnd, Z)
a = h.prob_latent(z)
b = h.prob_latent_bruteforce(z)
assert_almost_equal_array(a, b)
# Check the condition we care about
z, t = randomstate(rnd, Z), randompos(rnd, T)
a = h.argmax_obs_given_latent_bruteforce(z, t)
b = h.argmax_latent_given_obs_bruteforce(z, t)
if a[0] == b[0]:
print "*** Condition holds ***"
print r'\argmax_{X} P(z_{t=%d}=%d | X) \neq \argmax_{X} P(X | z_{t=%d}=%d)' % (t, z, t, z)
else:
print "*** Condition FAILS ***"
print r'\argmax_{X} P(z_{t=%d}=%d | X) \neq \argmax_{X} P(X | z_{t=%d}=%d)' % (t, z, t, z)
print a
print b
if __name__ == '__main__':
test(2, 3, 4)
test(3, 3, 4)
test(4, 3, 4)
test(4, 4, 4)
#test(4, 4, 5)
# vim: set sts=4 ts=4 sw=4:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment