Skip to content

Instantly share code, notes, and snippets.

@cerisara
Created March 8, 2016 13:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cerisara/f71e511d594b8c736d65 to your computer and use it in GitHub Desktop.
Save cerisara/f71e511d594b8c736d65 to your computer and use it in GitHub Desktop.
Dialogue act recognition Keras model
import numpy as np
from keras.preprocessing import sequence
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten, TimeDistributedDense
from keras.layers.recurrent import LSTM
from keras.layers.embeddings import Embedding
from keras.utils import np_utils
from keras.preprocessing.text import Tokenizer
from keras.models import Graph
from keras.utils.np_utils import accuracy
import re
import copy
import sys
import globvars
if globvars.plotWeights:
import matplotlib.pyplot as plt
from dasio import *
def normalized(a, f=0.5, axis=-1, order=2):
l2 = np.atleast_1d(np.linalg.norm(a, order, axis))
l2[l2==0] = 1
b = a / np.expand_dims(l2, axis)
return f * b
# dont do x-val, but rather take the train/test list from Stolcke
for xv in range(1):
print("Loading data...")
(X_train, Y_train0, Z_train), (X_test, Y_test0, Z_test) = load_data(nb_words=globvars.max_words,xval=xv)
if globvars.addPrevInputs:
# note: this BOW is also interesting because it includes all words, without truncation. TODO: add such a vector also for current sentence
tokenizer = Tokenizer(nb_words=globvars.max_words)
PX_tr = tokenizer.sequences_to_matrix(X_train, mode='binary')
PX_te = tokenizer.sequences_to_matrix(X_test, mode='binary')
PX_tr = np.roll(PX_tr,1,axis=0)
PX_tr[0][:]=0
PX_te = np.roll(PX_te,1,axis=0)
PX_te[0][:]=0
print("#sentences train: "+str(len(X_train))+" test: "+str(len(X_test)))
print("out nb_classes= "+str(globvars.nb_classes))
# here we still have the full sequences
if globvars.bigram:
BX_tr = copy.deepcopy(X_train)
BZ_tr = copy.deepcopy(Z_train)
BX_te = copy.deepcopy(X_test)
BZ_te = copy.deepcopy(Z_test)
# now replace single word with bigram
for i in xrange(len(X_train)):
BX_tr[i][0]=globvars.voc.get('STARTUTT')
BZ_tr[i][0]=globvars.vocpos.get('PADDING')
for j in xrange(len(X_train[i])-1): BX_tr[i][j+1]=X_train[i][j]
for j in xrange(len(Z_train[i])-1): BZ_tr[i][j+1]=Z_train[i][j]
for i in xrange(len(X_test)):
BX_te[i][0]=globvars.voc.get('STARTUTT')
BZ_te[i][0]=globvars.vocpos.get('PADDING')
for j in xrange(len(X_test[i])-1): BX_te[i][j+1]=X_test[i][j]
for j in xrange(len(Z_test[i])-1): BZ_te[i][j+1]=Z_test[i][j]
BX_tr= sequence.pad_sequences(BX_tr, maxlen=globvars.maxlen, padding='post', truncating='post')
BX_te= sequence.pad_sequences(BX_te, maxlen=globvars.maxlen, padding='post', truncating='post')
BZ_tr= sequence.pad_sequences(BZ_tr, maxlen=globvars.maxlen, padding='post', truncating='post')
BZ_te= sequence.pad_sequences(BZ_te, maxlen=globvars.maxlen, padding='post', truncating='post')
X_train = sequence.pad_sequences(X_train, maxlen=globvars.maxlen, padding='post', truncating='post')
X_test = sequence.pad_sequences(X_test, maxlen=globvars.maxlen, padding='post', truncating='post')
Z_test = sequence.pad_sequences(Z_test, maxlen=globvars.maxlen, padding='post', truncating='post')
Z_train = sequence.pad_sequences(Z_train, maxlen=globvars.maxlen, padding='post', truncating='post')
if True:
Y_train = np.zeros((len(X_train),globvars.nb_classes))#,dtype=np.float32)
nw=0
co={}
for t in range(globvars.nb_classes):
co[t]=0
for t in range(len(X_train)):
Y_train[t][Y_train0[t]]=1
co[Y_train0[t]]=co.get(Y_train0[t])+1
print("counts labels: "+str(co))
Y_test = np.zeros((len(Y_test0),globvars.nb_classes))#,dtype=np.float32)
for t in range(len(Y_test)):
Y_test[t][Y_test0[t]]=1
# optional: add noise to the words to increase corpus size
ncopy=0
arrs = ()
for j in xrange(ncopy):
tmpdbl = np.copy(X_train)
for i in xrange(len(tmpdbl)): tmpdbl[i][np.random.randint(0,globvars.maxlen)]=np.random.randint(0,len(globvars.voc))
arrs=arrs+(tmpdbl,)
arrs=arrs+(X_train,)
X_train=np.concatenate(arrs,axis=0)
arrs = ()
for j in xrange(ncopy):
tmpdbl = np.copy(Z_train)
arrs=arrs+(tmpdbl,)
arrs=arrs+(Z_train,)
Z_train=np.concatenate(arrs,axis=0)
if globvars.bigram:
arrs = ()
for j in xrange(ncopy):
tmpdbl = np.copy(BX_tr)
arrs=arrs+(tmpdbl,)
arrs=arrs+(BX_tr,)
BX_tr=np.concatenate(arrs,axis=0)
arrs = ()
for j in xrange(ncopy):
tmpdbl = np.copy(BZ_tr)
arrs=arrs+(tmpdbl,)
arrs=arrs+(BZ_tr,)
BZ_tr=np.concatenate(arrs,axis=0)
if globvars.addPrevInputs:
arrs = ()
for j in xrange(ncopy):
tmpdbl = np.copy(PX_tr)
arrs=arrs+(tmpdbl,)
arrs=arrs+(PX_tr,)
PX_tr=np.concatenate(arrs,axis=0)
arrs = ()
for j in xrange(ncopy):
tmpdbl = np.copy(Y_train)
arrs=arrs+(tmpdbl,)
arrs=arrs+(Y_train,)
Y_train = np.concatenate(arrs,axis=0)
arrs = ()
for j in xrange(ncopy):
tmpdbl = np.copy(Y_train0)
arrs=arrs+(tmpdbl,)
arrs=arrs+(Y_train0,)
Y_train0 = np.concatenate(arrs,axis=0)
model = Graph()
model.add_input(name='input', input_shape=(globvars.maxlen,), dtype=int)
model.add_input(name='inputpos', input_shape=(globvars.maxlen,), dtype=int)
wembed = Embedding(globvars.max_words, globvars.embedsize, input_length=globvars.maxlen, trainable=globvars.trainEmbed)
pembed = Embedding(len(globvars.vocpos), len(globvars.vocpos)*3/4, input_length=globvars.maxlen, trainable=globvars.trainEmbed)
model.add_node(wembed, name='embed', input='input')
model.add_node(pembed, name='embedpos', input='inputpos')
if globvars.bigram:
model.add_input(name='binput', input_shape=(globvars.maxlen,), dtype=int)
model.add_input(name='binputpos', input_shape=(globvars.maxlen,), dtype=int)
bwembed = Embedding(globvars.max_words, globvars.embedsize, input_length=globvars.maxlen, trainable=globvars.trainEmbed)
bpembed = Embedding(len(globvars.vocpos), len(globvars.vocpos)*3/4, input_length=globvars.maxlen, trainable=globvars.trainEmbed)
model.add_node(bwembed, name='bembed', input='binput')
model.add_node(bpembed, name='bembedpos', input='binputpos')
hidlay = TimeDistributedDense(globvars.nhids[0], activation='tanh',trainable=True)
model.add_node(hidlay, name='dense1', inputs=['embed','embedpos','bembed','bembedpos'])
out = 'dense1'
for idx, n in enumerate(globvars.nhids[1:]):
i=idx+2
out = 'dense'+str(i)
model.add_node(TimeDistributedDense(n, activation='tanh'), name=out, input='dense'+str(i-1))
model.add_node(LSTM(globvars.hidden,return_sequences=False,go_backwards=True), name='lstm', input=out)
else:
model.add_node(LSTM(globvars.hidden,return_sequences=False,go_backwards=True), name='lstm', inputs=['embed','embedpos'])
model.add_node(Dropout(globvars.dropout), name='dropout', input='lstm')
if globvars.addPrevInputs:
model.add_input(name='previnput', input_shape=(globvars.max_words,))
model.add_node(Dense(globvars.nfin), name='merge', inputs=['dropout','previnput'])
model.add_node(Activation('tanh'), name='mergeact', input='merge')
model.add_node(Dense(globvars.nb_classes), name='dense', input='mergeact')
else:
model.add_node(Dense(globvars.nb_classes), name='dense', input='dropout')
model.add_node(Activation('softmax'), name='softmax', input='dense')
model.add_output(name='output', input='softmax')
model.compile('adam', {'output': 'categorical_crossentropy'})
if globvars.initembed:
print("load init embeddings")
mots,wemb = load_embeddings()
nset=0
for i in xrange(len(wemb)):
wi = globvars.voc.get(mots[i])
if wi:
ws[0][wi][:]=wemb[i][:]
nset=nset+1
wembed.set_weights(ws)
if globvars.bigram:
# just to be safe, I don't use np.copy ... ?
bws = bwembed.get_weights()
for i in xrange(len(wemb)):
wi = globvars.voc.get(mots[i])
if wi:
bws[0][wi][:]=wemb[i][:]
bwembed.set_weights(bws)
print("loaded and set nembeddings "+str(nset))
# normalisation of weights:
if globvars.normweights:
ws[0]=normalized(ws[0]) # [0] or not ??
wembed.set_weights(ws)
printWeightsNorm(ws)
if globvars.bigram:
bws = bwembed.get_weights()
bws[0]=normalized(bws[0])
wembed.set_weights(bws)
# test one by one; more flexible, but could be much faster by batch
rec = np.zeros((len(X_test),globvars.nb_classes))
# dont need to reallocate these arrays for every sentence, because all sentences have the same length
x = np.zeros((1,len(X_test[0])))
z = np.zeros((1,len(X_test[0])))
bx = np.zeros((1,len(X_test[0])))
bz = np.zeros((1,len(X_test[0])))
px = np.zeros((1,len(PX_te[0]))) # idem: PX_te[*] are of length = vocab size
for i in xrange(len(X_test)):
x[0][:] = X_test[i][:]
z[0][:] = Z_test[i][:]
ar={'input':x,'inputpos':z}
if globvars.addPrevInputs:
px[0][:] = PX_te[i][:]
ar['previnput']=px
if globvars.bigram:
bx[0][:] = BX_te[i][:]
bz[0][:] = BZ_te[i][:]
ar['binput']=bx
ar['binputpos']=bz
rec[i] = model.predict(ar).get('output')[0]
nok=0
for i in range(len(X_test)):
if Y_test0[i]==rec[i].argmax():
nok=nok+1
print("test init "+str(nok)+" "+str(len(X_test))+" "+str(float(nok)/float(len(X_test))))
print("conf matrix")
ref=np.zeros((globvars.nb_classes,globvars.nb_classes),dtype=np.int32)
for z in range(len(rec)):
ref[Y_test0[z],rec[z].argmax()]=ref[Y_test0[z],rec[z].argmax()]+1
for z in range(globvars.nb_classes):
print("classGold "+str(z)+": "+str(ref[z].sum())+" post: "+str(ref[:,z].sum()))
for epo in xrange(globvars.nb_epoch):
ar={'input':X_train,'inputpos':Z_train,'output':Y_train}
if globvars.addPrevInputs: ar['previnput']=PX_tr
if globvars.bigram:
ar['binput']=BX_tr
ar['binputpos']=BZ_tr
model.fit(ar, batch_size=globvars.batch_size, nb_epoch=1, shuffle=globvars.shuf, verbose=1)
# EVAL sur le train
ar={'input':X_train,'inputpos':Z_train}
if globvars.addPrevInputs: ar['previnput']=PX_tr
if globvars.bigram:
ar['binput']=BX_tr
ar['binputpos']=BZ_tr
if globvars.addPrevClass: ar['cheat']=PY_train
rec = model.predict(ar).get('output')
nok=0
for i in range(len(X_train)):
if Y_train0[i]==rec[i].argmax():
nok=nok+1
print("train epoch "+str(epo)+" "+str(nok)+" "+str(len(X_train))+" "+str(float(nok)/float(len(X_train))))
# EVAL sur le test
rec = np.zeros((len(X_test),globvars.nb_classes))
x = np.zeros((1,len(X_test[0])))
z = np.zeros((1,len(X_test[0])))
bx = np.zeros((1,len(X_test[0])))
bz = np.zeros((1,len(X_test[0])))
px = np.zeros((1,len(PX_te[0]))) # idem: PX_te[*] are of length = vocab size
for i in xrange(len(X_test)):
x[0][:] = X_test[i][:]
z[0][:] = Z_test[i][:]
ar={'input':x,'inputpos':z}
if globvars.addPrevInputs:
px[0][:] = PX_te[i][:]
ar['previnput']=px
if globvars.bigram:
bx[0][:] = BX_te[i][:]
bz[0][:] = BZ_te[i][:]
ar['binput']=bx
ar['binputpos']=bz
recr = model.predict(ar).get('output')
rec[i] = recr[0]
nok=0
for i in range(len(X_test)):
if Y_test0[i]==rec[i].argmax():
nok=nok+1
print("test epoch "+str(epo)+" "+str(nok)+" "+str(len(X_test))+" "+str(float(nok)/float(len(X_test))))
np.save("lstm",model.get_weights())
import numpy as np
import globvars
import re
def load_embeddings():
w2vfile = "voc.w2v"
print("read w2v embeddings")
f = open(w2vfile,'rb')
nw=0
while True:
s=f.readline()
if not s:
break
nw=nw+1
f.close()
wemb = np.zeros((nw,300))
mots = []
f = open(w2vfile,'rb')
for w in xrange(nw):
s=f.readline()
s=s.strip()
cols=s.split()
mots.append(cols[0])
for i in xrange(1,len(cols)):
wemb[w][i-1]=float(cols[i])
f.close()
return (mots,wemb)
def load_data(nb_words=1000,seed=123,xval=0):
'''load a conll corpus and outputs a list (per sentence) of list (per word) of word indices
'''
# Note that the following code assumes the existence of a column for "POS-tags"
# but in our experiments, this column always contains the same uninformative POSTAG: "WORD"
# This column is kept just in case POS-tags would be required in future experiments
corpus = "train.conll"
testfile = "test.conll"
vocfile = "voc.txt"
# the .voc file contains the voc obtained with
# cut -f1 das.conll | sort | uniq -c | sort -nr > das.voc
# plus PADDING and UNKNOWN
print("read the voc "+vocfile+" "+str(nb_words))
f = open(vocfile,'rb')
for i in range(nb_words):
s=f.readline()
if not s:
break
s = s.strip()
globvars.voc[s]=i
f.close()
globvars.padd=globvars.voc.get('PADDING')
print("padd in load "+str(globvars.padd))
print("read the train conll "+str(len(globvars.voc)))
X,Y,Z = [], [], []
f = open(corpus,'rb')
motsinutt, postinutt = [], []
nl,nutt=0,0
# sequence.pad will later add 0 for padding, so reserve this postag index
globvars.vocpos['PADDING']=0
while (True):
s=f.readline()
if not s: break
s=s.strip()
cols=s.split()
nl=nl+1
if len(cols)>0:
post=globvars.vocpos.get(cols[1])
if post==None:
post=len(globvars.vocpos)
globvars.vocpos[cols[1]]=post
postinutt.append(post)
lidx=globvars.voclab.get(cols[2])
if lidx==None:
lidx=len(globvars.voclab)
globvars.voclab[cols[2]]=lidx
widx=globvars.voc.get(cols[0])
if widx==None:
widx=globvars.voc.get('UNKNOWN')
motsinutt.append(widx)
else:
nutt=nutt+1
X.append(motsinutt)
Y.append(lidx)
Z.append(postinutt)
motsinutt, postinutt = [], []
f.close()
print("nvoclab "+str(len(globvars.voclab)))
print("nb of examples in corpus= "+str(len(X))+" "+str(len(Y))+" "+str(nl)+" "+str(nutt))
print("read the test conll")
Xtest,Ytest,Ztest = [], [], []
f = open(testfile,'rb')
motsinutt,postinutt=[],[]
while (True):
s=f.readline()
if (not s):
break
cols=s.split()
if (len(cols)>0):
post=globvars.vocpos.get(cols[1])
if post==None:
post=len(globvars.vocpos)
globvars.vocpos[cols[1]]=post
postinutt.append(post)
lidx=globvars.voclab.get(cols[2])
if lidx==None:
lidx=len(globvars.voclab)
globvars.voclab[cols[2]]=lidx
widx=globvars.voc.get(cols[0])
if widx==None:
widx=globvars.voc.get('UNKNOWN')
motsinutt.append(widx)
else:
Xtest.append(motsinutt)
Ytest.append(lidx)
Ztest.append(postinutt)
motsinutt,postinutt=[],[]
f.close()
print("nb of examples in corpus= "+str(len(Xtest))+" "+str(len(Ytest)))
globvars.nb_classes = len(globvars.voclab)
print("inner nb classes= "+str(globvars.nb_classes)+" "+str(max(globvars.voclab.values())))
print("classes: "+str(list(globvars.voclab.values()).sort()))
print("data loaded: input voc layer= "+str(len(globvars.voc))+" "+str(len(globvars.vocpos))+" output labels layer= "+str(len(globvars.voclab)))
print(globvars.voclab)
print(globvars.vocpos)
# np.random.seed(seed)
# np.random.shuffle(X)
# np.random.seed(seed)
# np.random.shuffle(Y)
# np.random.seed(seed)
# np.random.shuffle(Z)
return (X, Y, Z), (Xtest, Ytest, Ztest)
maxlen = 15 # cut sentences after maxlen words
max_words = 1000
# TODO check whether this batch_size has been tuned for GPU computation ?
batch_size = 2048
embedsize = 300
padd = -1 # to be set in load_data
# POStags are supported but not used in our experiments
voc, voclab, vocpos = {}, {}, {}
nb_classes = -1
dropout = 0.5
hidden=50
nfin = 200
# not used if bigram=False:
nhids = [300]
nb_epoch = 30
trainEmbed=True
shuf=True
bigram=False
addPrevInputs = True
# must be false if bigram=False:
riskmin = False
normweights=False
initembed=False
plotWeights=False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment