Created
February 4, 2019 15:51
-
-
Save iwan-rg/4e7f522a53e664607c2a3e664f4c076a to your computer and use it in GitHub Desktop.
Code for "Detecting Errors in Arabic Text using Neural Sequence Labeling"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.colab import files | |
uploaded = files.upload() | |
for fn in uploaded.keys(): | |
print('User uploaded file "{name}" with length {length} bytes'.format( | |
name=fn, length=len(uploaded[fn]))) | |
############# | |
import tensorflow as tf | |
import keras | |
print(keras.__version__) | |
print(tf.__version__) | |
#use a fixed seed for the random number generator to address randomness problem and get reproducable results with keras. the numbers don't make much difference. | |
import numpy as np | |
import random as rn | |
# The below is necessary for starting Numpy generated random numbers | |
# in a well-defined initial state. | |
np.random.seed(42) | |
# The below is necessary for starting core Python generated random numbers | |
# in a well-defined state. | |
rn.seed(42) | |
# Force TensorFlow to use single thread. | |
# Multiple threads are a potential source of non-reproducible results. | |
# For further details, see: https://stackoverflow.com/questions/42022950/ | |
session_conf = tf.ConfigProto(intra_op_parallelism_threads=1, | |
inter_op_parallelism_threads=1) | |
from keras import backend as K | |
# The below tf.set_random_seed() will make random number generation | |
# in the TensorFlow backend have a well-defined initial state. | |
# For further details, see: | |
# https://www.tensorflow.org/api_docs/python/tf/set_random_seed | |
tf.set_random_seed(42) | |
sess = tf.Session(graph=tf.get_default_graph(), config=session_conf) | |
K.set_session(sess) | |
# Setting PYTHONHASHSEED for determinism was not listed anywhere for TensorFlow, | |
# but apparently it is necessary for the Theano backend | |
# (https://github.com/fchollet/keras/issues/850). | |
import os | |
os.environ['PYTHONHASHSEED'] = '0' | |
############# | |
import pandas as pd | |
import numpy as np | |
#data = pd.read_csv("Arabic DS.txt", encoding="utf-8") | |
data = pd.read_csv("Arabic DS Aug.txt", encoding="utf-8") | |
data = data.fillna(method="ffill") | |
data.tail(10) | |
print(data) | |
############# | |
#for counting only | |
words = list(set(data["Word"].values)) | |
words.append("ENDPAD") | |
n_words = len(words); | |
print(n_words) | |
tags = list(set(data["Tag"].values)) | |
n_tags = len(tags); | |
print(n_tags) | |
############## | |
""" | |
We use the SentenceGetter class to retrieve sentences with their labels. | |
""" | |
class SentenceGetter(object): | |
def __init__(self, data): | |
self.n_sent = 1 | |
self.data = data | |
self.empty = False | |
agg_func = lambda s: [(w, t) for w, t in zip(s["Word"].values.tolist(), | |
s["Tag"].values.tolist())] | |
self.grouped = self.data.groupby("Sentence #").apply(agg_func) | |
self.sentences = [s for s in self.grouped] | |
def get_next(self): | |
try: | |
s = self.grouped["Sentence: {}".format(self.n_sent)] | |
self.n_sent += 1 | |
return s | |
except: | |
return None | |
################### | |
getter = SentenceGetter(data) | |
sent = getter.get_next() | |
print(sent) | |
sentences = getter.sentences | |
import matplotlib.pyplot as plt | |
plt.style.use("ggplot") | |
plt.hist([len(s) for s in sentences], bins=50) | |
plt.title('Sentence Lengths') | |
plt.ylabel('sentences') | |
plt.xlabel('lengths') | |
#plt.legend() | |
plt.show() | |
################## | |
from keras.preprocessing.sequence import pad_sequences | |
word2idx = pd.read_csv("word2idxArA.txt", encoding="utf-8") | |
indexed=[(w, n) for w, n in zip(word2idx["Word"].values.tolist(), word2idx["No"].values.tolist())] | |
print(indexed) | |
################## | |
from keras.preprocessing.sequence import pad_sequences | |
max_len = 17 | |
XX=[] | |
X=[] | |
for s in sentences: | |
XX=[] | |
#print(s) | |
for w in s: | |
#print(w) | |
for I in indexed: | |
#print(I) | |
#print(I[0]) | |
#print(w[0]) | |
if I[0] == w[0]: | |
XX.append(I[1]) | |
#print (XX) | |
X.append(XX) | |
#print(X) | |
print (X[0]) | |
print(len(X)) | |
X = pad_sequences(maxlen=max_len, sequences=X, padding="post", value=n_words - 1)#pad with last word of index | |
#n_words-1 is the index of “ENDPAD” in word2idx. Cleaner would be to use word2idx[“ENDPAD”] as value. | |
print(X[0]) | |
print(X.shape[1]) | |
################# | |
yy=[] | |
y=[] | |
for s in sentences: | |
yy=[] | |
#print(s) | |
for w in s: | |
#print(w) | |
if w[1] == "i": | |
yy.append(0) | |
else: | |
yy.append(1) | |
#print (XX) | |
y.append(yy) | |
print(y) | |
y = pad_sequences(maxlen=max_len, sequences=y, padding="post", value=0)#pad with zeros or ones | |
print(y[0]) | |
############### | |
import keras.backend.tensorflow_backend as tfb | |
#Not using the following will cause fn=fp and thus precision = recall | |
""" | |
weighted_binary_crossentropy allows to set class weights (the classification is binary), | |
i.e. make positive errors larger than negative errors. This is useful when the training data is unbalanced. | |
""" | |
#POS_WEIGHT = .4 # multiplier for positive targets, needs to be tuned (N/P) .4 for Arabic DS & .28 for Arabic DS Aug | |
POS_WEIGHT = .28 | |
def weighted_binary_crossentropy(target, output): # pos_weight, allows one to trade off recall and precision by up- or down-weighting the cost of a positive error relative to a negative error. | |
""" | |
Weighted binary crossentropy between an output tensor | |
and a target tensor. POS_WEIGHT is used as a multiplier | |
for the positive targets. | |
pos_weight: A coefficient to use on the positive examples. | |
A value pos_weights > 1 decreases the false negative count, hence increasing the recall. | |
Conversely setting pos_weights < 1 decreases the false positive count and increases the precision | |
Combination of the following functions: | |
* keras.losses.binary_crossentropy | |
* keras.backend.tensorflow_backend.binary_crossentropy | |
* tf.nn.weighted_cross_entropy_with_logits | |
""" | |
# transform back to logits | |
_epsilon = tfb._to_tensor(tfb.epsilon(), output.dtype.base_dtype) | |
output = tf.clip_by_value(output, _epsilon, 1 - _epsilon) | |
output = tf.log(output / (1 - output)) | |
# compute weighted loss | |
loss = tf.nn.weighted_cross_entropy_with_logits(targets=target, | |
logits=output, | |
pos_weight=POS_WEIGHT) | |
return tf.reduce_mean(loss, axis=-1) | |
################# | |
from keras.models import Model, Input | |
from keras.layers import LSTM, Embedding, Dense, TimeDistributed, Dropout,Reshape, SimpleRNN, Bidirectional | |
from keras.utils import to_categorical, plot_model | |
import numpy | |
from matplotlib import pyplot | |
import sklearn.model_selection | |
from sklearn.model_selection import KFold | |
from pandas import DataFrame | |
from keras.callbacks import EarlyStopping | |
from google.colab import files | |
es = EarlyStopping(monitor='val_loss', patience=2) #early stopping prevents overfitting and eleminated the need for epoch tuning by stopping when val-loss starts increase | |
seed = 42 | |
numpy.random.seed(seed) | |
train1 = DataFrame() | |
val1 = DataFrame() | |
train2 = DataFrame() | |
val2 = DataFrame() | |
Finalprecision=[] | |
Finalrecall=[] | |
Finalaccuracy=[] | |
FinalF=[] | |
i=0 | |
kf= sklearn.model_selection.KFold(n_splits=10, shuffle=True,random_state=seed) #k=10, shuffles data , if you use random_state=some_number, then you can guarantee that the output of Run 1 will be equal to the output of Run 2, i.e. your split will be always the same. | |
for train, test in kf.split(X, y): | |
print('Train: %s | test: %s' % (train, test)) | |
if i==0: | |
ycat = to_categorical(y, num_classes=n_tags)#For training the network we also need to change the labels y to categorial.It always puts the true labels into second column when used with 2 classes. | |
input = Input(shape=(max_len,)) # This returns a tensor. The comma is necessary when you have only one dimension. | |
model = Embedding(input_dim=n_words, output_dim=100, input_length=max_len, name="Embedding")(input) #output_dim if 50 then the nw will learn 50-dimentional embeddings for each word. # This embedding layer will encode the input sequence # into a sequence of dense 50-dimensional vectors. | |
#model = Dropout(0.2)(model) | |
#model = (LSTM(units=125, return_sequences=True))(model)#recurrent_dropout, specifying the dropout rate of the recurrent units. dropout, a float specifying the dropout rate for input units of the layer | |
#model = (LSTM(units=125, return_sequences=True))(model) | |
model = Bidirectional(LSTM(units=100, return_sequences=True))(model) | |
#model = Bidirectional(LSTM(units=25, return_sequences=True, recurrent_dropout=0.2))(model) #, merge_mode='concat' | |
#model = SimpleRNN(units=100, return_sequences=True, recurrent_dropout=0.3)(model) # A rule of thumb is to have the number of hidden units be in-between the number of input units (output_dim) and output classes (2); | |
#model = Dropout(0.2)(model) | |
#model = SimpleRNN(units=100, return_sequences=True, recurrent_dropout=0.2)(model) | |
out = TimeDistributed(Dense(n_tags, activation="sigmoid"))(model) # sigmoid output layer | |
model = Model(input, out) | |
model.compile(optimizer="nadam", loss=weighted_binary_crossentropy, metrics=["accuracy"]) | |
history = model.fit(X[train], ycat[train], batch_size=8, epochs=100, callbacks=[es], verbose=1, validation_data=(X[test], ycat[test])) #To read out the accuracy for each fold for the training and test data | |
if i==0: | |
print(model.summary(90)) | |
#plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True) | |
i+=1 | |
# To plot each fold | |
#1: A plot of accuracy on the training and validation datasets over training epochs. | |
hist = pd.DataFrame(history.history) | |
pyplot.plot(history.history['acc']) | |
pyplot.plot(history.history['val_acc']) | |
pyplot.title('model train vs validation accuracy') | |
pyplot.ylabel('acc') | |
pyplot.xlabel('epoch') | |
pyplot.legend(['train', 'validation'], loc='upper right') #train is ‘acc’ #test is ‘val_acc’ | |
pyplot.show() | |
#2:A plot of loss on the training and validation datasets over training epochs. | |
pyplot.plot(history.history['loss']) | |
pyplot.plot(history.history['val_loss']) | |
pyplot.title('model train vs validation loss') | |
pyplot.ylabel('loss') | |
pyplot.xlabel('epoch') | |
pyplot.legend(['train', 'validation'], loc='upper right') | |
pyplot.show() | |
#zero out for each iteration | |
Myprecision=0 | |
Myrecall=0 | |
Myaccuracy=0 | |
Mytp=0 | |
Myfp=0 | |
Myfn=0 | |
Mytn=0 | |
#Print all predictions from each iteration and manually calculate EVERYTHING!: | |
for i in range(0,49): | |
p = model.predict(np.array([X[test][i]])) | |
##print ([X[test][i]]) | |
##print(p) | |
p = np.argmax(p, axis=-1) | |
##print ([y[test][i]]) | |
##print(p) | |
Mypredicted=p[0] | |
Mytrue= np.array([y[test][i]])[0] | |
Mypredicted2=[] | |
Mytrue2=[] | |
##print (Mypredicted) | |
##print(Mytrue) | |
with open('rnnn.txt', 'a') as f: | |
f.write("{:15} ({:5}): {} \r\n".format("Word", "True", "Pred")) | |
##print("{:15} ({:5}): {}".format("Word", "True", "Pred")) | |
for w, pred in zip(X[test][i], p[0]): | |
for I in indexed: | |
if I[1]==w: | |
if I[0] != "ENDPAD": | |
Mypredicted2.append(pred) | |
##print("{:15}: {}".format(I[0], tags[pred])) | |
with open('rnnn.txt', 'a') as f: | |
f.write("{:15}: {} \r\n".format(I[0], tags[pred]))#tags[pred] is wrong you should put i if 0 and c if 1 | |
length=len(Mypredicted2) | |
z=0 | |
while length>0: | |
Mytrue2.append(Mytrue[z]) | |
z+=1 | |
length-=1 | |
##print (Mytrue2) | |
for true, pred in zip(Mytrue2, Mypredicted2): #for the entire array of each sentence | |
#print(Mytrue) | |
#print(Mypredicted) | |
#""" | |
#if i=0 and c=1 | |
if true==1: | |
if pred == 1: | |
Mytp +=1 | |
elif pred==0: | |
Myfn +=1 | |
elif true ==0: | |
if pred ==1: | |
Myfp +=1 | |
elif pred==0: | |
Mytn +=1 | |
##print(Mytp) | |
##print(Myfp) | |
##print(Myfn) | |
##print(Mytn) | |
#for every fold print: | |
Myprecision= Mytp/(Mytp+Myfp) if (Mytp+Myfp) !=0 else 0 | |
Myrecall= Mytp/(Mytp+Myfn) if (Mytp+Myfn) !=0 else 0 | |
Myaccuracy= (Mytp+Mytn)/(Mytp+Mytn+Myfp+Myfn) if (Mytp+Mytn+Myfp+Myfn) !=0 else 0 | |
MyF= ((1.25)*((Myprecision*Myrecall)/((0.25*Myprecision)+ Myrecall))) | |
print ("Accuracy: {}".format(Myaccuracy*100)) | |
print("Precision: {}".format(Myprecision*100)) | |
print("Recall: {}".format(Myrecall*100)) | |
print("F: {}".format(MyF*100)) | |
with open('rnnn.txt', 'a') as f: | |
f.write("FOLD TP:{} TN:{} FP:{} FN:{} A:{} P:{} R:{} F:{} \r\n".format(Mytp, Mytn, Myfp, Myfn, Myaccuracy*100, Myprecision*100, Myrecall*100, MyF*100 )) | |
#for this fold | |
Finalprecision.append(numpy.mean(Myprecision*100)) #precisions of every fold | |
Finalrecall.append(numpy.mean(Myrecall*100)) | |
Finalaccuracy.append(numpy.mean(Myaccuracy*100)) | |
FinalF.append(numpy.mean(MyF*100)) | |
################### | |
files.download('rnnn.txt') | |
############### | |
#for all | |
print(Finalaccuracy) #should be 10 values for each fold | |
print(Finalprecision) | |
print(Finalrecall) | |
print("Accuracy all: %.2f%% (+/- %.2f%%)" % (numpy.mean(Finalaccuracy), numpy.std(Finalaccuracy))) | |
print("Precision: %.2f%% (+/- %.2f%%)" % (numpy.mean(Finalprecision), numpy.std(Finalprecision))) | |
print("Recall: %.2f%% (+/- %.2f%%)" % (numpy.mean(Finalrecall), numpy.std(Finalrecall))) | |
print("F: %.2f%% (+/- %.2f%%)" % (numpy.mean(FinalF), numpy.std(FinalF))) | |
############## | |
from keras.models import Model | |
layer_name = 'Embedding' | |
intermediate_layer_model = Model(inputs=model.input, | |
outputs=model.get_layer(layer_name).output) | |
intermediate_output = intermediate_layer_model.predict(np.array([Q[i]])) | |
intermediate_output | |
############### | |
# To plot multiple runs | |
#1: A plot of accuracy on the training and validation datasets over training epochs. | |
pyplot.plot(train2, color='blue', label='train') | |
pyplot.plot(val2, color='orange', label='validation') | |
pyplot.title('model train vs validation accuracy') | |
pyplot.ylabel('acc') | |
pyplot.xlabel('epoch') | |
#pyplot.legend(['train', 'validation'], loc='upper right') | |
pyplot.show() | |
#2:A plot of loss on the training and validation datasets over training epochs. | |
pyplot.plot(train1, color='blue', label='train') | |
pyplot.plot(val1, color='orange', label='validation') | |
pyplot.title('model train vs validation loss') | |
pyplot.ylabel('loss') | |
pyplot.xlabel('epoch') | |
#pyplot.legend(['train', 'validation'], loc='upper right') | |
pyplot.show() | |
############### | |
#Save the model for deployment | |
# serialize model to JSON | |
model_json = model.to_json() | |
with open("model.json", "w") as json_file: | |
json_file.write(model_json) | |
# serialize weights to HDF5 | |
model.save_weights("modelw.h5") | |
model.save("model.h5") | |
print("Saved model to disk") | |
#download the files | |
from google.colab import files | |
#files.download('model.json') | |
files.download('model.h5') | |
Laying out notebook... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment