-
-
Save dirko/1d596ca757a541da96ac3caa6f291229 to your computer and use it in GitHub Desktop.
# Keras==1.0.6 | |
from keras.models import Sequential | |
import numpy as np | |
from keras.layers.recurrent import LSTM | |
from keras.layers.core import TimeDistributedDense, Activation | |
from keras.preprocessing.sequence import pad_sequences | |
from keras.layers.embeddings import Embedding | |
from sklearn.cross_validation import train_test_split | |
from keras.layers import Merge | |
from keras.backend import tf | |
from lambdawithmask import Lambda as MaskLambda | |
from sklearn.metrics import confusion_matrix, accuracy_score | |
raw = open('wikigold.conll.txt', 'r').readlines() | |
all_x = [] | |
point = [] | |
for line in raw: | |
stripped_line = line.strip().split(' ') | |
point.append(stripped_line) | |
if line == '\n': | |
all_x.append(point[:-1]) | |
point = [] | |
all_x = all_x[:-1] | |
lengths = [len(x) for x in all_x] | |
short_x = [x for x in all_x if len(x) < 64] | |
X = [[c[0] for c in x] for x in short_x] | |
y = [[c[1] for c in y] for y in short_x] | |
all_text = [c for x in X for c in x] | |
words = list(set(all_text)) | |
word2ind = {word: index for index, word in enumerate(words)} | |
ind2word = {index: word for index, word in enumerate(words)} | |
labels = list(set([c for x in y for c in x])) | |
label2ind = {label: (index + 1) for index, label in enumerate(labels)} | |
ind2label = {(index + 1): label for index, label in enumerate(labels)} | |
print 'Input sequence length range: ', max(lengths), min(lengths) | |
maxlen = max([len(x) for x in X]) | |
print 'Maximum sequence length:', maxlen | |
def encode(x, n): | |
result = np.zeros(n) | |
result[x] = 1 | |
return result | |
X_enc = [[word2ind[c] for c in x] for x in X] | |
X_enc_reverse = [[c for c in reversed(x)] for x in X_enc] | |
max_label = max(label2ind.values()) + 1 | |
y_enc = [[0] * (maxlen - len(ey)) + [label2ind[c] for c in ey] for ey in y] | |
y_enc = [[encode(c, max_label) for c in ey] for ey in y_enc] | |
X_enc_f = pad_sequences(X_enc, maxlen=maxlen) | |
X_enc_b = pad_sequences(X_enc_reverse, maxlen=maxlen) | |
y_enc = pad_sequences(y_enc, maxlen=maxlen) | |
(X_train_f, X_test_f, X_train_b, | |
X_test_b, y_train, y_test) = train_test_split(X_enc_f, X_enc_b, y_enc, | |
test_size=11*32, train_size=45*32, random_state=42) | |
print 'Training and testing tensor shapes:' | |
print X_train_f.shape, X_test_f.shape, X_train_b.shape, X_test_b.shape, y_train.shape, y_test.shape | |
max_features = len(word2ind) | |
embedding_size = 128 | |
hidden_size = 32 | |
out_size = len(label2ind) + 1 | |
def reverse_func(x, mask=None): | |
return tf.reverse(x, [False, True, False]) | |
model_forward = Sequential() | |
model_forward.add(Embedding(max_features, embedding_size, input_length=maxlen, mask_zero=True)) | |
model_forward.add(LSTM(hidden_size, return_sequences=True)) | |
model_backward = Sequential() | |
model_backward.add(Embedding(max_features, embedding_size, input_length=maxlen, mask_zero=True)) | |
model_backward.add(LSTM(hidden_size, return_sequences=True)) | |
model_backward.add(MaskLambda(function=reverse_func, mask_function=reverse_func)) | |
model = Sequential() | |
model.add(Merge([model_forward, model_backward], mode='concat')) | |
model.add(TimeDistributedDense(out_size)) | |
model.add(Activation('softmax')) | |
model.compile(loss='categorical_crossentropy', optimizer='adam') | |
batch_size = 32 | |
model.fit([X_train_f, X_train_b], y_train, batch_size=batch_size, nb_epoch=40, | |
validation_data=([X_test_f, X_test_b], y_test)) | |
score = model.evaluate([X_test_f, X_test_b], y_test, batch_size=batch_size) | |
print('Raw test score:', score) | |
def score(yh, pr): | |
coords = [np.where(yhh > 0)[0][0] for yhh in yh] | |
yh = [yhh[co:] for yhh, co in zip(yh, coords)] | |
ypr = [prr[co:] for prr, co in zip(pr, coords)] | |
fyh = [c for row in yh for c in row] | |
fpr = [c for row in ypr for c in row] | |
return fyh, fpr | |
pr = model.predict_classes([X_train_f, X_train_b]) | |
yh = y_train.argmax(2) | |
fyh, fpr = score(yh, pr) | |
print 'Training accuracy:', accuracy_score(fyh, fpr) | |
print 'Training confusion matrix:' | |
print confusion_matrix(fyh, fpr) | |
pr = model.predict_classes([X_test_f, X_test_b]) | |
yh = y_test.argmax(2) | |
fyh, fpr = score(yh, pr) | |
print 'Testing accuracy:', accuracy_score(fyh, fpr) | |
print 'Testing confusion matrix:' | |
print confusion_matrix(fyh, fpr) |
Thanks for this great tutorial!
I have problem with Masking function:
model_backward.add(MaskLambda(function=reverse_func, mask_function=reverse_func))
MaskLambda is not a resolved function in my version of keras so I replace it with Masking() function which I imported it from "from keras.layers import Masking"
Is this fine so far?
another thing is that the argument are not acceptable with this function :
model_backward.add(Masking(function=reverse_func, mask_function=reverse_func))
How could I replace it?
TypeError: ('Keyword argument not understood:', 'function')
Thank you for sharing. Amazing code.
I am a beginner in RNN and LSTM. My question might be very basic. Why are we including X_enc_reverse( or X_train_b) in the model.
And can you please suggest a decent documentation to read about it
Hello, I am trying to train my NER model and I found an example script on how to do it with Keras http://pythonexample.com/code/keras%20unidirectional%20tagger/
I am surprise do see that there are no comments in the code, so I did some google search and I found the same think with your git repo.
Would you please suggest a tutorial where the code is well explained?
Thank you.
Keras Training Error:
When I run the code it renders the following error:
File "C:\Users\Kidane\Anaconda3\lib\site-packages\keras\engine\training.py", line 108, in standardize_input_data
str(array.shape))
Exception: Error when checking model target: expected activation_1 to have shape (None, 63, 32) but got array with shape (1440, 63, 6)
Please help me on how to fix this issue,
Thanks,
Kidane