Last active
July 31, 2018 23:34
-
-
Save leeyspaul/7e8320c74ee0e1e6a0a38f9df8a76367 to your computer and use it in GitHub Desktop.
Final project v1. (Inspiration from babi_rnn by Keras team - Taken from Jupyter Notebook)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import numpy as np | |
import pandas as pd | |
from keras.preprocessing.text import text_to_word_sequence, Tokenizer | |
from keras.preprocessing.sequence import pad_sequences | |
from keras.models import Sequential, Model | |
from keras.layers import Dense, Flatten, Embedding, Input, Concatenate, Add, RepeatVector, RNN, LSTM, Dot, Dropout | |
from keras.optimizers import Adam, SGD | |
from keras.metrics import categorical_accuracy | |
from itertools import chain | |
# Get all the files from dir | |
files = os.listdir() | |
all_training_files = [] | |
all_testing_files = [] | |
for fn in files: | |
if 'train' in fn: | |
all_training_files.append(fn) | |
if 'test' in fn: | |
all_testing_files.append(fn) | |
# Run sort | |
all_training_files = np.asarray(sorted(all_training_files)) | |
all_testing_files = np.asarray(sorted(all_testing_files)) | |
# Get dictionary of tasks | |
training_tasks_dict = dict((k+1,v) for k,v in enumerate(all_training_files)) | |
testing_tasks_dict = dict((k+1,v) for k,v in enumerate(all_testing_files)) | |
# Get first task | |
task_training = training_tasks_dict[1] | |
task_testing = testing_tasks_dict[1] | |
def txt_to_raw(file_name): | |
''' | |
take in a file_name and then return a raw String corpus of the contained text | |
''' | |
with open(file_name, 'r') as file: | |
raw_corpus = file.readlines() | |
return raw_corpus | |
# Convert to corpus | |
task_training_corpus = txt_to_raw(task_training) | |
task_testing_corpus = txt_to_raw(task_testing) | |
def parse_story(story): | |
''' | |
parse the passed in raw text corpus into the corresponding tuples | |
''' | |
related_content = [] | |
data = [] | |
for line in story: | |
line_id,line = line.split(' ',1) | |
line_id = int(line_id) | |
# Split up line and line_id to check for line_id whether 1 or not, if 1 reset the content | |
if line_id == 1: | |
related_content = [] | |
# If there is a \t in the line, it is an answer | |
if '\t' in line: | |
# Not sure if we need to utilize supporting_fact number line | |
question,answer,supporting_facts = line.split('\t') | |
# Use tf.keras.preprocessing.text.text_to_word_sequence for easy filter | |
question = text_to_word_sequence(question,filters='?\n') | |
answer = [answer] | |
substory = [ss for ss in related_content if ss] | |
# Append the tuple to the data | |
data.append((substory,question,answer)) | |
related_content.append('') | |
else: | |
# Tokenize the line | |
line = text_to_word_sequence(line,filters='.\n') + ['.'] | |
for word in line: | |
related_content.append(word) | |
return data | |
# Parse the data | |
training_data = parse_story(task_training_corpus) | |
testing_data = parse_story(task_testing_corpus) | |
def get_unique_vocab(file_name): | |
''' | |
generate the unique vocab | |
''' | |
with open(file_name,'r') as file: | |
raw_corpus = file.read() | |
tokenized = text_to_word_sequence(raw_corpus, filters='\n\t?123456789101112131415.') | |
return set(tokenized + ['.']) | |
vocab = get_unique_vocab(task_training) | |
vocab_maxlen = len(vocab) + 1 | |
story_maxlen = max(map(len,[s for s,_,_ in training_data])) | |
question_maxlen = max(map(len,[q for _,q,_ in training_data])) | |
# Create dictionary of words to indices | |
word_index = dict((c, i + 1) for i, c in enumerate(vocab)) | |
# Get the array of index to words | |
index_words = [''] + list(vocab) | |
# Set limits | |
sentence_limit = story_maxlen | |
vocab_size = vocab_maxlen | |
def data_to_vector(data,word_dictionary,vocab_size,sentence_limit): | |
''' | |
Stories and questions are represented as word embeddings and the answers are one-hot encoded. | |
Takes the stories, finds unique words, and then vectorizing them into pure numeric form. | |
Each word has a numeric index which it gets replaced by! | |
''' | |
STORY_VECTOR,QUESTION_VECTOR,ANSWER_VECTOR = [],[],[] | |
for story,question,answer in data: | |
# Encode the story representations | |
STORY_VECTOR.append([word_dictionary[word] for word in story]) | |
# Encode the question representations | |
QUESTION_VECTOR.append([word_dictionary[word] for word in question]) | |
ANSWER_VECTOR.append(word_dictionary[answer[0]]) | |
return pad_sequences(STORY_VECTOR,maxlen=story_maxlen),pad_sequences(QUESTION_VECTOR,maxlen=question_maxlen),np.array(ANSWER_VECTOR) | |
# Get s,q,a of training and testing for model | |
story_training_input,question_training_input,answer_training_input = data_to_vector(training_data,word_index, | |
vocab_size,sentence_limit) | |
story_testing_input,question_testing_input,answer_testing_input = data_to_vector(testing_data,word_index, | |
vocab_size,sentence_limit) | |
def test_encodings(story_encoding): | |
''' | |
helper function to test encodings | |
''' | |
decoded = [] | |
for encoded_word in story_encoding: | |
decoded.append(index_words[encoded_word]) | |
return ' '.join(decoded) | |
# Combine stories and questions to feed into the model | |
sq_training_combined = [] | |
for sq in zipped_sq_training: | |
sq_training_combined.append(list(chain(sq[0],sq[1]))) | |
# Get max length of story + question length in combined vector | |
combined_maxlen = max(map(len,[sq for sq in sq_training_combined])) | |
# Combined Model | |
# Create the Input layer with neuron size of combined_maxlen which is the combined length of | |
# the questions and the answers | |
sq_in = Input(shape=(combined_maxlen,), dtype='int32') | |
# Now create the embedding layer for the combined words of stories and questions which takes in the input from | |
# the above Input layer | |
sq_emb = Embedding(input_dim=vocab_maxlen,output_dim=50)(sq_in) | |
# Pass embedding into a hidden layer | |
sq_hidden = Dense(vocab_maxlen, activation='relu')(sq_emb) | |
# Connect the embedding layer to a LSTM Layer | |
lstm_out = LSTM(50)(sq_hidden) | |
# Final out layer | |
dense_out = Dense(vocab_maxlen, activation='softmax')(lstm_out) | |
model = Model(sq_in,dense_out) | |
model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(lr=0.01), metrics=['accuracy']) | |
model.fit(np.array(sq_training_combined),np.array(answer_training_input),epochs=10) | |
#### SECOND MODEL UTILIZING TWO EMBEDDING LAYERS ##### | |
# Create Input Layer for the Stories | |
# Input Layer has 66 Neurons and so it will be taking in as input a vector of shape=(66,) | |
story_input = Input(shape=(story_maxlen,), dtype='int32') | |
print(f'The input layer for the stories has a total of {story_maxlen} neurons to take in a vector of 66 x 1') | |
# Create an Embedding layer to create the word embeddings for the story inputs | |
# Takes as input the story_input and then outputs a dense embedding matrix of dimensions 21 x 50 | |
s_layer = Embedding(input_dim=vocab_maxlen, output_dim=50)(story_input) | |
print(f'The embedding layer has the same number of rows as the max vocabulary length for all unique words: {vocab_maxlen}') | |
s_layer = Flatten()(s_layer) | |
# Create Input Layer for the Questions | |
# Input Layer for the questions has a max length of 3 so the Input layer will 3 Neurons | |
questions_input = Input(shape=(question_maxlen,), dtype='int32') | |
# Create questions Embedding layer | |
q_layer = Embedding(input_dim=vocab_maxlen, output_dim=50)(questions_input) | |
# Flatten out the embeddings to single vectors and concatenate them | |
q_layer = Flatten()(q_layer) | |
# Concat the two flattened vectors | |
merge = Concatenate()([s_layer,q_layer]) | |
out = Dense(50,activation='relu',use_bias=True)(merge) | |
out = Dropout((0.10))(out) | |
out = Dense(vocab_maxlen,activation='softmax')(out) | |
model = Model([story_input,questions_input],out) | |
model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(lr=0.001), metrics=['accuracy']) | |
model.summary() | |
print(f'The input layer for the questions has a total of {question_maxlen} neurons to take in a vector of 3 x 1') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment