Skip to content

Instantly share code, notes, and snippets.

@affo
Last active July 28, 2016 09:28
Show Gist options
  • Save affo/4d7ee1a650d68b3d2a0e52b3068e3079 to your computer and use it in GitHub Desktop.
Save affo/4d7ee1a650d68b3d2a0e52b3068e3079 to your computer and use it in GitHub Desktop.
Implementation for sequence models at LxMLS
import numpy as np
import lxmls.readers.simple_sequence as ssr
simple = ssr.SimpleSequence()
print simple.train, type(simple.train.seq_list)
print simple.test, type(simple.test.seq_list)
for s in simple.train.seq_list:
print s.x, s.y
ys = []
for s in simple.train.seq_list:
ys.extend(s.y)
xs = []
for s in simple.train.seq_list:
xs.extend(s.x)
ys = set(ys)
xs = set(xs)
# computing the initial counts
c_init = np.zeros(len(ys), dtype=np.int)
for i in xrange(len(ys)):
for s in simple.train.seq_list:
if s.y[0] == i: c_init[i] += 1
# computing the transition counts
M = len(simple.train.seq_list) # cardinality of training set
N = len(simple.train.seq_list[0].y) # length of an element of the training set
c_trans = np.zeros((len(ys), len(ys)), dtype=np.int)
for k in xrange(len(ys)):
for l in xrange(len(ys)):
for m in xrange(M):
for i in xrange(1, N):
y_i_m = simple.train.seq_list[m].y[i]
y_i1_m = simple.train.seq_list[m].y[i - 1]
if y_i_m == k and y_i1_m == l: c_trans[k, l] += 1
# computing the final counts
ys = set(ys)
c_final = np.zeros(len(ys), dtype=np.int)
for i in xrange(len(ys)):
for s in simple.train.seq_list:
if s.y[N - 1] == i: c_final[i] += 1
# computing the emission counts
c_emiss = np.zeros((len(xs), len(ys)), dtype=np.int)
for j in xrange(len(xs)):
for k in xrange(len(ys)):
for m in xrange(M):
for i in xrange(N):
x_i_m = simple.train.seq_list[m].x[i]
y_i_m = simple.train.seq_list[m].y[i]
if y_i_m == k and x_i_m == j: c_emiss[j, k] += 1
print
print '>>> Init'
print c_init
print '>>> Transi'
print c_trans
print '>>> Final'
print c_final
print '>>> Emiss'
print c_emiss
# calculating probabilities
p_init = c_init / float(c_init.sum())
p_trans = np.zeros((len(ys), len(ys)))
for i in xrange(len(ys)):
p_trans[:, i] = c_trans[:, i] / float(c_trans[:, i].sum() + c_final[i])
p_final = np.zeros(len(ys))
for i in xrange(len(ys)):
p_final[i] = c_final[i] / float(c_trans[:, i].sum() + c_final[i])
p_emiss = np.zeros((len(xs), len(ys)))
for i in xrange(len(ys)):
p_emiss[:, i] = c_emiss[:, i] / float(c_emiss[:, i].sum())
print
print '>>> PInit'
print p_init
print '>>> PTransi'
print p_trans
print '>>> PFinal'
print p_final
print '>>> PEmiss'
print p_emiss
# END training
# Forward probability
# Let's calculate a sequence posterior:
# - the probability of the sequence of state being Y, given
# that X was emitted.
# A state posterior:
# - the p of a precise state being y_i, given that X was emitted
# A transition posterior:
# - the p of a bi-gram, given X
def forward(x, i, c_k):
emiss = p_emiss[x[i - 1], c_k]
if i <= 1:
return p_init[c_k] * emiss
if i >= N + 1:
res = 0
for c_l in ys:
res += p_final[c_l] * forward(x, i - 1, c_l)
return res
res = 0
for c_l in ys:
res += p_trans[c_k, c_l] * forward(x, i - 1, c_l)
return res * emiss
x_in = simple.test.seq_list[0].x + [-1]
print "Forward Log-Likelihood", x_in, "=", np.log(forward(x_in, N + 1, -1))
def backward(x, i, c_l):
if i >= N:
return p_final[c_l]
if i <= 0:
res = 0
for c_k in ys:
res += p_init[c_k] * backward(x, 1, c_k) * p_emiss[x[i + 2], c_k]
return res
res = 0
for c_k in ys:
res += p_trans[c_k, c_l] * backward(x, i + 1, c_k) * p_emiss[x[i + 1], c_k]
return res
x_in = [-1] + simple.test.seq_list[0].x
print "Backward Log-Likelihood", x_in, "=", np.log(backward(x_in, 0, -1))
def fb(x, i):
res = 0
for c_k in ys:
res += forward(x + [-1], i, c_k) * backward([-1] + x, i, c_k)
return res
x_in = simple.test.seq_list[0].x
px = fb(x_in, 2) # random i
print "FB Log-Likelihood", x_in, "=", np.log(px)
for i in xrange(1, N):
r = fb(x_in, i)
assert str(r) == str(px), "{} != {}".format(r, px)
### Viterbi
# Let's put to logs
p_init = np.log(p_init)
p_emiss = np.log(p_emiss)
p_trans = np.log(p_trans)
p_final = np.log(p_final)
def viterbi(x):
return _vit_rec(x, 0, np.zeros(len(ys), dtype=np.float64), [])
def _vit_rec(x, i, acc, seq):
if i >= N:
acc += p_final
return acc.max(), seq
if i == 0:
tr = np.tile(p_init, (len(ys), 1))
else:
tr = p_trans.T
old_acc = np.tile(acc, (len(ys), 1))
emit_xi = np.tile(p_emiss[x[i], :], (len(ys), 1))
acc = (old_acc + tr + emit_xi).max(1)
seq.append(acc.argmax())
return _vit_rec(x, i + 1, acc, seq)
x_in = simple.test.seq_list[0].x
score, seq = viterbi(x_in)
print "Viterbi ", x_in, "is", score, seq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment