-
-
Save stephentu/6858179 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import itertools as it | |
def normalize(x): | |
return x / np.sum(x) | |
def normalizerow(x): | |
return (x.T / np.sum(x, axis=1)).T | |
def normalizecol(x): | |
return x / np.sum(x, axis=0) | |
def almost_equal(x, y, tol=1e-3): | |
return np.fabs(x - y) <= tol | |
def almost_equal_array(x, y, tol=1e-3): | |
return (np.fabs(x - y) <= tol).all() | |
def randompath(rnd, X, T): | |
return rnd.random_integers(low=0, high=X-1, size=T) | |
def randomstate(rnd, Z): | |
return rnd.random_integers(low=0, high=Z-1) | |
def randomobs(rnd, X): | |
return rnd.random_integers(low=0, high=X-1) | |
def randompos(rnd, T): | |
return rnd.random_integers(low=0, high=T-1) | |
def assert_almost_equal(a, b): | |
if almost_equal(a, b): | |
print "[PASSED] a=%f, b=%f" % (a, b) | |
else: | |
print "[FAILED] a=%f, b=%f" % (a, b) | |
assert False | |
def assert_almost_equal_array(a, b): | |
if almost_equal_array(a, b): | |
print "[PASSED] a=%s, b=%s" % (str(a), str(b)) | |
else: | |
print "[FAILED] a=%s, b=%s" % (str(a), str(b)) | |
assert False | |
class MultinomialHMM(object): | |
def __init__(self, Z, X, T): | |
self._Z, self._X, self._T = Z, X, T | |
def _forward(self, Pi, A, B, X): | |
""" | |
Returns all forward computations of alpha, with model H = (Pi, A, B), | |
for a single observation sequence X | |
shape: (Z, T) | |
""" | |
alphas = np.zeros((self._Z, self._T)) | |
for i in xrange(self._Z): | |
alphas[i, 0] = Pi[i] * B[X[0], i] | |
for t in xrange(1, self._T): | |
for i in xrange(self._Z): | |
alphas[i, t] = np.sum((alphas[z, t - 1] * A[z, i] * B[X[t], i] for z in xrange(self._Z))) | |
return alphas | |
def _backward(self, Pi, A, B, X): | |
""" | |
Returns all backward computations of beta, with model H = (Pi, A, B), | |
for a single observation sequence X | |
shape: (Z, T) | |
""" | |
betas = np.zeros((self._Z, self._T)) | |
for i in xrange(self._Z): | |
betas[i, self._T - 1] = 1.0 | |
for t in xrange(self._T - 2, -1, -1): | |
for i in xrange(self._Z): | |
betas[i, t] = np.sum((betas[z, t + 1] * A[i, z] * B[X[t + 1], z] for z in xrange(self._Z))) | |
return betas | |
def argmax_over_obs_bruteforce(self, fn): | |
"""returns (argmax, value)""" | |
best, best_value = None, None | |
for obs in it.product(*[range(self._X) for _ in xrange(self._T)]): | |
v = fn(obs) | |
if not best_value or v > best_value: | |
best, best_value = obs, v | |
return best, best_value | |
def argmax_obs_given_latent_bruteforce(self, z, t): | |
"""returns argmax_{X_{1:T}} P(X_{1:T} | z_t=z)""" | |
return self.argmax_over_obs_bruteforce(lambda X: self.prob_obs_given_latent(X, z, t)) | |
def argmax_latent_given_obs_bruteforce(self, z, t): | |
"""returns argmax_{X_{1:T}} P(z_t=z | X_{1:T})""" | |
return self.argmax_over_obs_bruteforce(lambda X: self.prob_latent_given_obs(X, z, t)) | |
def prob_latent_given_obs(self, X, z, t): | |
"""returns P(z_t=z | X)""" | |
return self._prob_latent_given_obs(X, z, t)[0] | |
def _prob_latent_given_obs(self, X, z, t): | |
"""returns (P(z_t=z | X), P(X))""" | |
alphas = self._forward(self._Pi, self._A, self._B, X) | |
betas = self._backward(self._Pi, self._A, self._B, X) | |
denom = np.sum(alphas[:, self._T - 1]) | |
return alphas[z, t] * betas[z, t] / denom, denom | |
def prob_latent_given_obs_bruteforce(self, X, z, t): | |
"""returns P(z_t=z | X)""" | |
return self._prob_latent_given_obs_bruteforce(X, z, t)[0] | |
def _prob_latent_given_obs_bruteforce(self, X, z, t): | |
"""returns (P(z_t=z | X), P(X))""" | |
# compute joint P(z_t=z, X) for each value of z | |
joints = np.zeros(self._Z) | |
for zv in xrange(self._Z): | |
zargs = [range(self._Z) for _ in xrange(t)] + \ | |
[[zv]] + \ | |
[range(self._Z) for _ in xrange(t+1, self._T)] | |
for zdata in it.product(*zargs): | |
joints[zv] += self.prob_joint(X, zdata) | |
px = joints.sum() | |
return joints[z] / px, px | |
def prob_obs_given_latent(self, X, z, t): | |
"""returns P(X | z_t=z)""" | |
p_latent_given_obs, p_obs = self._prob_latent_given_obs(X, z, t) | |
p_latent = self.prob_latent(t) | |
return p_latent_given_obs * p_obs / p_latent[z] | |
def prob_obs_given_latent_bruteforce(self, X, z, t): | |
"""returns P(X | z_t=z)""" | |
top = 0.0 | |
zargs = [range(self._Z) for _ in xrange(t)] + \ | |
[[z]] + \ | |
[range(self._Z) for _ in xrange(t+1, self._T)] | |
for zdata in it.product(*zargs): | |
top += self.prob_joint(X, zdata) | |
bot = 0.0 | |
xargs = [range(self._X) for _ in xrange(self._T)] | |
zargs = [range(self._Z) for _ in xrange(t)] + \ | |
[[z]] + \ | |
[range(self._Z) for _ in xrange(t+1, self._T)] | |
args = xargs + zargs | |
for data in it.product(*args): | |
thisX = np.array(data[:self._T]) | |
Z = np.array(data[self._T:]) | |
bot += self.prob_joint(thisX, Z) | |
return top / bot | |
def prob_latent(self, t): | |
"""returns [P(z_t=1), ..., P(z_t=N)]""" | |
assert t >= 0 and t < self._T | |
v_before = np.array(self._Pi) | |
for _ in xrange(1, t + 1): | |
v_before_next = np.zeros(self._Z) | |
for z_cur in xrange(self._Z): | |
v_before_next[z_cur] = (self._A[:, z_cur] * v_before).sum() | |
v_before = v_before_next | |
v_after = np.ones(self._Z) | |
for _ in xrange(self._T - 1, t, -1): | |
v_after_next = np.zeros(self._Z) | |
for z_cur in xrange(self._Z): | |
v_after_next[z_cur] = (self._A[z_cur, :] * v_after).sum() | |
v_after = v_after_next | |
return v_before * v_after | |
def prob_latent_bruteforce(self, t): | |
"""returns [P(z_t=1), ..., P(z_t=N)]""" | |
assert t >= 0 and t < self._T | |
ret = np.zeros(self._Z) | |
for z in xrange(self._Z): | |
xargs = [range(self._X) for _ in xrange(self._T)] | |
zargs = [range(self._Z) for _ in xrange(t)] + [[z]] + [range(self._Z) for _ in xrange(t+1, self._T)] | |
args = xargs + zargs | |
for data in it.product(*args): | |
X = np.array(data[:self._T]) | |
Z = np.array(data[self._T:]) | |
ret[z] += self.prob_joint(X, Z) | |
return ret | |
def prob_joint(self, X, Z): | |
res = self._Pi[Z[0]] * self._B[X[0], Z[0]] | |
for t in xrange(1, self._T): | |
res *= self._A[Z[t-1], Z[t]] * self._B[X[t], Z[t]] | |
return res | |
def test(Z, X, T, seed=None): | |
print "-------------- Testing Z=%d, X=%d, T=%d --------------" % (Z, X, T) | |
rnd = np.random.RandomState(seed) if seed is not None else np.random | |
Pi = normalize(rnd.uniform(low=0.1, high=1.0, size=Z)) | |
A = normalizerow(rnd.uniform(low=0.1, high=1.0, size=(Z, Z))) | |
B = normalizecol(rnd.uniform(low=0.1, high=1.0, size=(X, Z))) | |
h = MultinomialHMM(Z, X, T) | |
h._Pi, h._A, h._B = Pi, A, B | |
# Test P(X | z_t=z) | |
path, z, t = randompath(rnd, X, T), \ | |
randomstate(rnd, Z), \ | |
randompos(rnd, T) | |
a = h.prob_obs_given_latent(path, z, t) | |
b = h.prob_obs_given_latent_bruteforce(path, z, t) | |
assert_almost_equal(a, b) | |
# Test P(z_t=z | X) | |
path, z, t = randompath(rnd, X, T), \ | |
randomstate(rnd, Z), \ | |
randompos(rnd, T) | |
a = h.prob_latent_given_obs(path, z, t) | |
b = h.prob_latent_given_obs_bruteforce(path, z, t) | |
assert_almost_equal(a, b) | |
# Test P(z_t=z) | |
z = randomstate(rnd, Z) | |
a = h.prob_latent(z) | |
b = h.prob_latent_bruteforce(z) | |
assert_almost_equal_array(a, b) | |
# Check the condition we care about | |
z, t = randomstate(rnd, Z), randompos(rnd, T) | |
a = h.argmax_obs_given_latent_bruteforce(z, t) | |
b = h.argmax_latent_given_obs_bruteforce(z, t) | |
if a[0] == b[0]: | |
print "*** Condition holds ***" | |
print r'\argmax_{X} P(z_{t=%d}=%d | X) \neq \argmax_{X} P(X | z_{t=%d}=%d)' % (t, z, t, z) | |
else: | |
print "*** Condition FAILS ***" | |
print r'\argmax_{X} P(z_{t=%d}=%d | X) \neq \argmax_{X} P(X | z_{t=%d}=%d)' % (t, z, t, z) | |
pol_a = h.prob_obs_given_latent(a[0], z, t) | |
plo_a = h.prob_latent_given_obs(a[0], z, t) | |
pol_b = h.prob_obs_given_latent(b[0], z, t) | |
plo_b = h.prob_latent_given_obs(b[0], z, t) | |
assert_almost_equal(pol_a, a[1]) | |
assert_almost_equal(plo_b, b[1]) | |
assert pol_a >= pol_b | |
assert plo_b >= plo_a | |
print r'P(X=%s | z_{t=%d}=%d) = %f [maximized]' % (a[0], t, z, pol_a) | |
print r'P(z_{t=%d}=%d | X=%s) = %f' % (t, z, a[0], plo_a) | |
print r'P(X=%s | z_{t=%d}=%d) = %f' % (b[0], t, z, pol_b) | |
print r'P(z_{t=%d}=%d | X=%s) = %f [maximized]' % (t, z, b[0], plo_b) | |
if __name__ == '__main__': | |
test(2, 3, 4) | |
test(3, 3, 4) | |
test(4, 3, 4) | |
test(4, 4, 4) | |
#test(4, 4, 5) | |
# vim: set sts=4 ts=4 sw=4: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment