Last active
July 28, 2016 09:28
-
-
Save affo/4d7ee1a650d68b3d2a0e52b3068e3079 to your computer and use it in GitHub Desktop.
Implementation for sequence models at LxMLS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import lxmls.readers.simple_sequence as ssr | |
simple = ssr.SimpleSequence() | |
print simple.train, type(simple.train.seq_list) | |
print simple.test, type(simple.test.seq_list) | |
for s in simple.train.seq_list: | |
print s.x, s.y | |
ys = [] | |
for s in simple.train.seq_list: | |
ys.extend(s.y) | |
xs = [] | |
for s in simple.train.seq_list: | |
xs.extend(s.x) | |
ys = set(ys) | |
xs = set(xs) | |
# computing the initial counts | |
c_init = np.zeros(len(ys), dtype=np.int) | |
for i in xrange(len(ys)): | |
for s in simple.train.seq_list: | |
if s.y[0] == i: c_init[i] += 1 | |
# computing the transition counts | |
M = len(simple.train.seq_list) # cardinality of training set | |
N = len(simple.train.seq_list[0].y) # length of an element of the training set | |
c_trans = np.zeros((len(ys), len(ys)), dtype=np.int) | |
for k in xrange(len(ys)): | |
for l in xrange(len(ys)): | |
for m in xrange(M): | |
for i in xrange(1, N): | |
y_i_m = simple.train.seq_list[m].y[i] | |
y_i1_m = simple.train.seq_list[m].y[i - 1] | |
if y_i_m == k and y_i1_m == l: c_trans[k, l] += 1 | |
# computing the final counts | |
ys = set(ys) | |
c_final = np.zeros(len(ys), dtype=np.int) | |
for i in xrange(len(ys)): | |
for s in simple.train.seq_list: | |
if s.y[N - 1] == i: c_final[i] += 1 | |
# computing the emission counts | |
c_emiss = np.zeros((len(xs), len(ys)), dtype=np.int) | |
for j in xrange(len(xs)): | |
for k in xrange(len(ys)): | |
for m in xrange(M): | |
for i in xrange(N): | |
x_i_m = simple.train.seq_list[m].x[i] | |
y_i_m = simple.train.seq_list[m].y[i] | |
if y_i_m == k and x_i_m == j: c_emiss[j, k] += 1 | |
print '>>> Init' | |
print c_init | |
print '>>> Transi' | |
print c_trans | |
print '>>> Final' | |
print c_final | |
print '>>> Emiss' | |
print c_emiss | |
# calculating probabilities | |
p_init = c_init / float(c_init.sum()) | |
p_trans = np.zeros((len(ys), len(ys))) | |
for i in xrange(len(ys)): | |
p_trans[:, i] = c_trans[:, i] / float(c_trans[:, i].sum() + c_final[i]) | |
p_final = np.zeros(len(ys)) | |
for i in xrange(len(ys)): | |
p_final[i] = c_final[i] / float(c_trans[:, i].sum() + c_final[i]) | |
p_emiss = np.zeros((len(xs), len(ys))) | |
for i in xrange(len(ys)): | |
p_emiss[:, i] = c_emiss[:, i] / float(c_emiss[:, i].sum()) | |
print '>>> PInit' | |
print p_init | |
print '>>> PTransi' | |
print p_trans | |
print '>>> PFinal' | |
print p_final | |
print '>>> PEmiss' | |
print p_emiss | |
# END training | |
# Forward probability | |
# Let's calculate a sequence posterior: | |
# - the probability of the sequence of state being Y, given | |
# that X was emitted. | |
# A state posterior: | |
# - the p of a precise state being y_i, given that X was emitted | |
# A transition posterior: | |
# - the p of a bi-gram, given X | |
def forward(x, i, c_k): | |
emiss = p_emiss[x[i - 1], c_k] | |
if i <= 1: | |
return p_init[c_k] * emiss | |
if i >= N + 1: | |
res = 0 | |
for c_l in ys: | |
res += p_final[c_l] * forward(x, i - 1, c_l) | |
return res | |
res = 0 | |
for c_l in ys: | |
res += p_trans[c_k, c_l] * forward(x, i - 1, c_l) | |
return res * emiss | |
x_in = simple.test.seq_list[0].x + [-1] | |
print "Forward Log-Likelihood", x_in, "=", np.log(forward(x_in, N + 1, -1)) | |
def backward(x, i, c_l): | |
if i >= N: | |
return p_final[c_l] | |
if i <= 0: | |
res = 0 | |
for c_k in ys: | |
res += p_init[c_k] * backward(x, 1, c_k) * p_emiss[x[i + 2], c_k] | |
return res | |
res = 0 | |
for c_k in ys: | |
res += p_trans[c_k, c_l] * backward(x, i + 1, c_k) * p_emiss[x[i + 1], c_k] | |
return res | |
x_in = [-1] + simple.test.seq_list[0].x | |
print "Backward Log-Likelihood", x_in, "=", np.log(backward(x_in, 0, -1)) | |
def fb(x, i): | |
res = 0 | |
for c_k in ys: | |
res += forward(x + [-1], i, c_k) * backward([-1] + x, i, c_k) | |
return res | |
x_in = simple.test.seq_list[0].x | |
px = fb(x_in, 2) # random i | |
print "FB Log-Likelihood", x_in, "=", np.log(px) | |
for i in xrange(1, N): | |
r = fb(x_in, i) | |
assert str(r) == str(px), "{} != {}".format(r, px) | |
### Viterbi | |
# Let's put to logs | |
p_init = np.log(p_init) | |
p_emiss = np.log(p_emiss) | |
p_trans = np.log(p_trans) | |
p_final = np.log(p_final) | |
def viterbi(x): | |
return _vit_rec(x, 0, np.zeros(len(ys), dtype=np.float64), []) | |
def _vit_rec(x, i, acc, seq): | |
if i >= N: | |
acc += p_final | |
return acc.max(), seq | |
if i == 0: | |
tr = np.tile(p_init, (len(ys), 1)) | |
else: | |
tr = p_trans.T | |
old_acc = np.tile(acc, (len(ys), 1)) | |
emit_xi = np.tile(p_emiss[x[i], :], (len(ys), 1)) | |
acc = (old_acc + tr + emit_xi).max(1) | |
seq.append(acc.argmax()) | |
return _vit_rec(x, i + 1, acc, seq) | |
x_in = simple.test.seq_list[0].x | |
score, seq = viterbi(x_in) | |
print "Viterbi ", x_in, "is", score, seq | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment