Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save iwan-rg/4e7f522a53e664607c2a3e664f4c076a to your computer and use it in GitHub Desktop.
Save iwan-rg/4e7f522a53e664607c2a3e664f4c076a to your computer and use it in GitHub Desktop.
Code for "Detecting Errors in Arabic Text using Neural Sequence Labeling"
from google.colab import files
uploaded = files.upload()
for fn in uploaded.keys():
print('User uploaded file "{name}" with length {length} bytes'.format(
name=fn, length=len(uploaded[fn])))
#############
import tensorflow as tf
import keras
print(keras.__version__)
print(tf.__version__)
#use a fixed seed for the random number generator to address randomness problem and get reproducable results with keras. the numbers don't make much difference.
import numpy as np
import random as rn
# The below is necessary for starting Numpy generated random numbers
# in a well-defined initial state.
np.random.seed(42)
# The below is necessary for starting core Python generated random numbers
# in a well-defined state.
rn.seed(42)
# Force TensorFlow to use single thread.
# Multiple threads are a potential source of non-reproducible results.
# For further details, see: https://stackoverflow.com/questions/42022950/
session_conf = tf.ConfigProto(intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1)
from keras import backend as K
# The below tf.set_random_seed() will make random number generation
# in the TensorFlow backend have a well-defined initial state.
# For further details, see:
# https://www.tensorflow.org/api_docs/python/tf/set_random_seed
tf.set_random_seed(42)
sess = tf.Session(graph=tf.get_default_graph(), config=session_conf)
K.set_session(sess)
# Setting PYTHONHASHSEED for determinism was not listed anywhere for TensorFlow,
# but apparently it is necessary for the Theano backend
# (https://github.com/fchollet/keras/issues/850).
import os
os.environ['PYTHONHASHSEED'] = '0'
#############
import pandas as pd
import numpy as np
#data = pd.read_csv("Arabic DS.txt", encoding="utf-8")
data = pd.read_csv("Arabic DS Aug.txt", encoding="utf-8")
data = data.fillna(method="ffill")
data.tail(10)
print(data)
#############
#for counting only
words = list(set(data["Word"].values))
words.append("ENDPAD")
n_words = len(words);
print(n_words)
tags = list(set(data["Tag"].values))
n_tags = len(tags);
print(n_tags)
##############
"""
We use the SentenceGetter class to retrieve sentences with their labels.
"""
class SentenceGetter(object):
def __init__(self, data):
self.n_sent = 1
self.data = data
self.empty = False
agg_func = lambda s: [(w, t) for w, t in zip(s["Word"].values.tolist(),
s["Tag"].values.tolist())]
self.grouped = self.data.groupby("Sentence #").apply(agg_func)
self.sentences = [s for s in self.grouped]
def get_next(self):
try:
s = self.grouped["Sentence: {}".format(self.n_sent)]
self.n_sent += 1
return s
except:
return None
###################
getter = SentenceGetter(data)
sent = getter.get_next()
print(sent)
sentences = getter.sentences
import matplotlib.pyplot as plt
plt.style.use("ggplot")
plt.hist([len(s) for s in sentences], bins=50)
plt.title('Sentence Lengths')
plt.ylabel('sentences')
plt.xlabel('lengths')
#plt.legend()
plt.show()
##################
from keras.preprocessing.sequence import pad_sequences
word2idx = pd.read_csv("word2idxArA.txt", encoding="utf-8")
indexed=[(w, n) for w, n in zip(word2idx["Word"].values.tolist(), word2idx["No"].values.tolist())]
print(indexed)
##################
from keras.preprocessing.sequence import pad_sequences
max_len = 17
XX=[]
X=[]
for s in sentences:
XX=[]
#print(s)
for w in s:
#print(w)
for I in indexed:
#print(I)
#print(I[0])
#print(w[0])
if I[0] == w[0]:
XX.append(I[1])
#print (XX)
X.append(XX)
#print(X)
print (X[0])
print(len(X))
X = pad_sequences(maxlen=max_len, sequences=X, padding="post", value=n_words - 1)#pad with last word of index
#n_words-1 is the index of “ENDPAD” in word2idx. Cleaner would be to use word2idx[“ENDPAD”] as value.
print(X[0])
print(X.shape[1])
#################
yy=[]
y=[]
for s in sentences:
yy=[]
#print(s)
for w in s:
#print(w)
if w[1] == "i":
yy.append(0)
else:
yy.append(1)
#print (XX)
y.append(yy)
print(y)
y = pad_sequences(maxlen=max_len, sequences=y, padding="post", value=0)#pad with zeros or ones
print(y[0])
###############
import keras.backend.tensorflow_backend as tfb
#Not using the following will cause fn=fp and thus precision = recall
"""
weighted_binary_crossentropy allows to set class weights (the classification is binary),
i.e. make positive errors larger than negative errors. This is useful when the training data is unbalanced.
"""
#POS_WEIGHT = .4 # multiplier for positive targets, needs to be tuned (N/P) .4 for Arabic DS & .28 for Arabic DS Aug
POS_WEIGHT = .28
def weighted_binary_crossentropy(target, output): # pos_weight, allows one to trade off recall and precision by up- or down-weighting the cost of a positive error relative to a negative error.
"""
Weighted binary crossentropy between an output tensor
and a target tensor. POS_WEIGHT is used as a multiplier
for the positive targets.
pos_weight: A coefficient to use on the positive examples.
A value pos_weights > 1 decreases the false negative count, hence increasing the recall.
Conversely setting pos_weights < 1 decreases the false positive count and increases the precision
Combination of the following functions:
* keras.losses.binary_crossentropy
* keras.backend.tensorflow_backend.binary_crossentropy
* tf.nn.weighted_cross_entropy_with_logits
"""
# transform back to logits
_epsilon = tfb._to_tensor(tfb.epsilon(), output.dtype.base_dtype)
output = tf.clip_by_value(output, _epsilon, 1 - _epsilon)
output = tf.log(output / (1 - output))
# compute weighted loss
loss = tf.nn.weighted_cross_entropy_with_logits(targets=target,
logits=output,
pos_weight=POS_WEIGHT)
return tf.reduce_mean(loss, axis=-1)
#################
from keras.models import Model, Input
from keras.layers import LSTM, Embedding, Dense, TimeDistributed, Dropout,Reshape, SimpleRNN, Bidirectional
from keras.utils import to_categorical, plot_model
import numpy
from matplotlib import pyplot
import sklearn.model_selection
from sklearn.model_selection import KFold
from pandas import DataFrame
from keras.callbacks import EarlyStopping
from google.colab import files
es = EarlyStopping(monitor='val_loss', patience=2) #early stopping prevents overfitting and eleminated the need for epoch tuning by stopping when val-loss starts increase
seed = 42
numpy.random.seed(seed)
train1 = DataFrame()
val1 = DataFrame()
train2 = DataFrame()
val2 = DataFrame()
Finalprecision=[]
Finalrecall=[]
Finalaccuracy=[]
FinalF=[]
i=0
kf= sklearn.model_selection.KFold(n_splits=10, shuffle=True,random_state=seed) #k=10, shuffles data , if you use random_state=some_number, then you can guarantee that the output of Run 1 will be equal to the output of Run 2, i.e. your split will be always the same.
for train, test in kf.split(X, y):
print('Train: %s | test: %s' % (train, test))
if i==0:
ycat = to_categorical(y, num_classes=n_tags)#For training the network we also need to change the labels y to categorial.It always puts the true labels into second column when used with 2 classes.
input = Input(shape=(max_len,)) # This returns a tensor. The comma is necessary when you have only one dimension.
model = Embedding(input_dim=n_words, output_dim=100, input_length=max_len, name="Embedding")(input) #output_dim if 50 then the nw will learn 50-dimentional embeddings for each word. # This embedding layer will encode the input sequence # into a sequence of dense 50-dimensional vectors.
#model = Dropout(0.2)(model)
#model = (LSTM(units=125, return_sequences=True))(model)#recurrent_dropout, specifying the dropout rate of the recurrent units. dropout, a float specifying the dropout rate for input units of the layer
#model = (LSTM(units=125, return_sequences=True))(model)
model = Bidirectional(LSTM(units=100, return_sequences=True))(model)
#model = Bidirectional(LSTM(units=25, return_sequences=True, recurrent_dropout=0.2))(model) #, merge_mode='concat'
#model = SimpleRNN(units=100, return_sequences=True, recurrent_dropout=0.3)(model) # A rule of thumb is to have the number of hidden units be in-between the number of input units (output_dim) and output classes (2);
#model = Dropout(0.2)(model)
#model = SimpleRNN(units=100, return_sequences=True, recurrent_dropout=0.2)(model)
out = TimeDistributed(Dense(n_tags, activation="sigmoid"))(model) # sigmoid output layer
model = Model(input, out)
model.compile(optimizer="nadam", loss=weighted_binary_crossentropy, metrics=["accuracy"])
history = model.fit(X[train], ycat[train], batch_size=8, epochs=100, callbacks=[es], verbose=1, validation_data=(X[test], ycat[test])) #To read out the accuracy for each fold for the training and test data
if i==0:
print(model.summary(90))
#plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)
i+=1
# To plot each fold
#1: A plot of accuracy on the training and validation datasets over training epochs.
hist = pd.DataFrame(history.history)
pyplot.plot(history.history['acc'])
pyplot.plot(history.history['val_acc'])
pyplot.title('model train vs validation accuracy')
pyplot.ylabel('acc')
pyplot.xlabel('epoch')
pyplot.legend(['train', 'validation'], loc='upper right') #train is ‘acc’ #test is ‘val_acc’
pyplot.show()
#2:A plot of loss on the training and validation datasets over training epochs.
pyplot.plot(history.history['loss'])
pyplot.plot(history.history['val_loss'])
pyplot.title('model train vs validation loss')
pyplot.ylabel('loss')
pyplot.xlabel('epoch')
pyplot.legend(['train', 'validation'], loc='upper right')
pyplot.show()
#zero out for each iteration
Myprecision=0
Myrecall=0
Myaccuracy=0
Mytp=0
Myfp=0
Myfn=0
Mytn=0
#Print all predictions from each iteration and manually calculate EVERYTHING!:
for i in range(0,49):
p = model.predict(np.array([X[test][i]]))
##print ([X[test][i]])
##print(p)
p = np.argmax(p, axis=-1)
##print ([y[test][i]])
##print(p)
Mypredicted=p[0]
Mytrue= np.array([y[test][i]])[0]
Mypredicted2=[]
Mytrue2=[]
##print (Mypredicted)
##print(Mytrue)
with open('rnnn.txt', 'a') as f:
f.write("{:15} ({:5}): {} \r\n".format("Word", "True", "Pred"))
##print("{:15} ({:5}): {}".format("Word", "True", "Pred"))
for w, pred in zip(X[test][i], p[0]):
for I in indexed:
if I[1]==w:
if I[0] != "ENDPAD":
Mypredicted2.append(pred)
##print("{:15}: {}".format(I[0], tags[pred]))
with open('rnnn.txt', 'a') as f:
f.write("{:15}: {} \r\n".format(I[0], tags[pred]))#tags[pred] is wrong you should put i if 0 and c if 1
length=len(Mypredicted2)
z=0
while length>0:
Mytrue2.append(Mytrue[z])
z+=1
length-=1
##print (Mytrue2)
for true, pred in zip(Mytrue2, Mypredicted2): #for the entire array of each sentence
#print(Mytrue)
#print(Mypredicted)
#"""
#if i=0 and c=1
if true==1:
if pred == 1:
Mytp +=1
elif pred==0:
Myfn +=1
elif true ==0:
if pred ==1:
Myfp +=1
elif pred==0:
Mytn +=1
##print(Mytp)
##print(Myfp)
##print(Myfn)
##print(Mytn)
#for every fold print:
Myprecision= Mytp/(Mytp+Myfp) if (Mytp+Myfp) !=0 else 0
Myrecall= Mytp/(Mytp+Myfn) if (Mytp+Myfn) !=0 else 0
Myaccuracy= (Mytp+Mytn)/(Mytp+Mytn+Myfp+Myfn) if (Mytp+Mytn+Myfp+Myfn) !=0 else 0
MyF= ((1.25)*((Myprecision*Myrecall)/((0.25*Myprecision)+ Myrecall)))
print ("Accuracy: {}".format(Myaccuracy*100))
print("Precision: {}".format(Myprecision*100))
print("Recall: {}".format(Myrecall*100))
print("F: {}".format(MyF*100))
with open('rnnn.txt', 'a') as f:
f.write("FOLD TP:{} TN:{} FP:{} FN:{} A:{} P:{} R:{} F:{} \r\n".format(Mytp, Mytn, Myfp, Myfn, Myaccuracy*100, Myprecision*100, Myrecall*100, MyF*100 ))
#for this fold
Finalprecision.append(numpy.mean(Myprecision*100)) #precisions of every fold
Finalrecall.append(numpy.mean(Myrecall*100))
Finalaccuracy.append(numpy.mean(Myaccuracy*100))
FinalF.append(numpy.mean(MyF*100))
###################
files.download('rnnn.txt')
###############
#for all
print(Finalaccuracy) #should be 10 values for each fold
print(Finalprecision)
print(Finalrecall)
print("Accuracy all: %.2f%% (+/- %.2f%%)" % (numpy.mean(Finalaccuracy), numpy.std(Finalaccuracy)))
print("Precision: %.2f%% (+/- %.2f%%)" % (numpy.mean(Finalprecision), numpy.std(Finalprecision)))
print("Recall: %.2f%% (+/- %.2f%%)" % (numpy.mean(Finalrecall), numpy.std(Finalrecall)))
print("F: %.2f%% (+/- %.2f%%)" % (numpy.mean(FinalF), numpy.std(FinalF)))
##############
from keras.models import Model
layer_name = 'Embedding'
intermediate_layer_model = Model(inputs=model.input,
outputs=model.get_layer(layer_name).output)
intermediate_output = intermediate_layer_model.predict(np.array([Q[i]]))
intermediate_output
###############
# To plot multiple runs
#1: A plot of accuracy on the training and validation datasets over training epochs.
pyplot.plot(train2, color='blue', label='train')
pyplot.plot(val2, color='orange', label='validation')
pyplot.title('model train vs validation accuracy')
pyplot.ylabel('acc')
pyplot.xlabel('epoch')
#pyplot.legend(['train', 'validation'], loc='upper right')
pyplot.show()
#2:A plot of loss on the training and validation datasets over training epochs.
pyplot.plot(train1, color='blue', label='train')
pyplot.plot(val1, color='orange', label='validation')
pyplot.title('model train vs validation loss')
pyplot.ylabel('loss')
pyplot.xlabel('epoch')
#pyplot.legend(['train', 'validation'], loc='upper right')
pyplot.show()
###############
#Save the model for deployment
# serialize model to JSON
model_json = model.to_json()
with open("model.json", "w") as json_file:
json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("modelw.h5")
model.save("model.h5")
print("Saved model to disk")
#download the files
from google.colab import files
#files.download('model.json')
files.download('model.h5')
Laying out notebook...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment