Skip to content

Instantly share code, notes, and snippets.

@leeyspaul
Last active July 31, 2018 23:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leeyspaul/7e8320c74ee0e1e6a0a38f9df8a76367 to your computer and use it in GitHub Desktop.
Save leeyspaul/7e8320c74ee0e1e6a0a38f9df8a76367 to your computer and use it in GitHub Desktop.
Final project v1. (Inspiration from babi_rnn by Keras team - Taken from Jupyter Notebook)
import os
import numpy as np
import pandas as pd
from keras.preprocessing.text import text_to_word_sequence, Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential, Model
from keras.layers import Dense, Flatten, Embedding, Input, Concatenate, Add, RepeatVector, RNN, LSTM, Dot, Dropout
from keras.optimizers import Adam, SGD
from keras.metrics import categorical_accuracy
from itertools import chain
# Get all the files from dir
files = os.listdir()
all_training_files = []
all_testing_files = []
for fn in files:
if 'train' in fn:
all_training_files.append(fn)
if 'test' in fn:
all_testing_files.append(fn)
# Run sort
all_training_files = np.asarray(sorted(all_training_files))
all_testing_files = np.asarray(sorted(all_testing_files))
# Get dictionary of tasks
training_tasks_dict = dict((k+1,v) for k,v in enumerate(all_training_files))
testing_tasks_dict = dict((k+1,v) for k,v in enumerate(all_testing_files))
# Get first task
task_training = training_tasks_dict[1]
task_testing = testing_tasks_dict[1]
def txt_to_raw(file_name):
'''
take in a file_name and then return a raw String corpus of the contained text
'''
with open(file_name, 'r') as file:
raw_corpus = file.readlines()
return raw_corpus
# Convert to corpus
task_training_corpus = txt_to_raw(task_training)
task_testing_corpus = txt_to_raw(task_testing)
def parse_story(story):
'''
parse the passed in raw text corpus into the corresponding tuples
'''
related_content = []
data = []
for line in story:
line_id,line = line.split(' ',1)
line_id = int(line_id)
# Split up line and line_id to check for line_id whether 1 or not, if 1 reset the content
if line_id == 1:
related_content = []
# If there is a \t in the line, it is an answer
if '\t' in line:
# Not sure if we need to utilize supporting_fact number line
question,answer,supporting_facts = line.split('\t')
# Use tf.keras.preprocessing.text.text_to_word_sequence for easy filter
question = text_to_word_sequence(question,filters='?\n')
answer = [answer]
substory = [ss for ss in related_content if ss]
# Append the tuple to the data
data.append((substory,question,answer))
related_content.append('')
else:
# Tokenize the line
line = text_to_word_sequence(line,filters='.\n') + ['.']
for word in line:
related_content.append(word)
return data
# Parse the data
training_data = parse_story(task_training_corpus)
testing_data = parse_story(task_testing_corpus)
def get_unique_vocab(file_name):
'''
generate the unique vocab
'''
with open(file_name,'r') as file:
raw_corpus = file.read()
tokenized = text_to_word_sequence(raw_corpus, filters='\n\t?123456789101112131415.')
return set(tokenized + ['.'])
vocab = get_unique_vocab(task_training)
vocab_maxlen = len(vocab) + 1
story_maxlen = max(map(len,[s for s,_,_ in training_data]))
question_maxlen = max(map(len,[q for _,q,_ in training_data]))
# Create dictionary of words to indices
word_index = dict((c, i + 1) for i, c in enumerate(vocab))
# Get the array of index to words
index_words = [''] + list(vocab)
# Set limits
sentence_limit = story_maxlen
vocab_size = vocab_maxlen
def data_to_vector(data,word_dictionary,vocab_size,sentence_limit):
'''
Stories and questions are represented as word embeddings and the answers are one-hot encoded.
Takes the stories, finds unique words, and then vectorizing them into pure numeric form.
Each word has a numeric index which it gets replaced by!
'''
STORY_VECTOR,QUESTION_VECTOR,ANSWER_VECTOR = [],[],[]
for story,question,answer in data:
# Encode the story representations
STORY_VECTOR.append([word_dictionary[word] for word in story])
# Encode the question representations
QUESTION_VECTOR.append([word_dictionary[word] for word in question])
ANSWER_VECTOR.append(word_dictionary[answer[0]])
return pad_sequences(STORY_VECTOR,maxlen=story_maxlen),pad_sequences(QUESTION_VECTOR,maxlen=question_maxlen),np.array(ANSWER_VECTOR)
# Get s,q,a of training and testing for model
story_training_input,question_training_input,answer_training_input = data_to_vector(training_data,word_index,
vocab_size,sentence_limit)
story_testing_input,question_testing_input,answer_testing_input = data_to_vector(testing_data,word_index,
vocab_size,sentence_limit)
def test_encodings(story_encoding):
'''
helper function to test encodings
'''
decoded = []
for encoded_word in story_encoding:
decoded.append(index_words[encoded_word])
return ' '.join(decoded)
# Combine stories and questions to feed into the model
sq_training_combined = []
for sq in zipped_sq_training:
sq_training_combined.append(list(chain(sq[0],sq[1])))
# Get max length of story + question length in combined vector
combined_maxlen = max(map(len,[sq for sq in sq_training_combined]))
# Combined Model
# Create the Input layer with neuron size of combined_maxlen which is the combined length of
# the questions and the answers
sq_in = Input(shape=(combined_maxlen,), dtype='int32')
# Now create the embedding layer for the combined words of stories and questions which takes in the input from
# the above Input layer
sq_emb = Embedding(input_dim=vocab_maxlen,output_dim=50)(sq_in)
# Pass embedding into a hidden layer
sq_hidden = Dense(vocab_maxlen, activation='relu')(sq_emb)
# Connect the embedding layer to a LSTM Layer
lstm_out = LSTM(50)(sq_hidden)
# Final out layer
dense_out = Dense(vocab_maxlen, activation='softmax')(lstm_out)
model = Model(sq_in,dense_out)
model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(lr=0.01), metrics=['accuracy'])
model.fit(np.array(sq_training_combined),np.array(answer_training_input),epochs=10)
#### SECOND MODEL UTILIZING TWO EMBEDDING LAYERS #####
# Create Input Layer for the Stories
# Input Layer has 66 Neurons and so it will be taking in as input a vector of shape=(66,)
story_input = Input(shape=(story_maxlen,), dtype='int32')
print(f'The input layer for the stories has a total of {story_maxlen} neurons to take in a vector of 66 x 1')
# Create an Embedding layer to create the word embeddings for the story inputs
# Takes as input the story_input and then outputs a dense embedding matrix of dimensions 21 x 50
s_layer = Embedding(input_dim=vocab_maxlen, output_dim=50)(story_input)
print(f'The embedding layer has the same number of rows as the max vocabulary length for all unique words: {vocab_maxlen}')
s_layer = Flatten()(s_layer)
# Create Input Layer for the Questions
# Input Layer for the questions has a max length of 3 so the Input layer will 3 Neurons
questions_input = Input(shape=(question_maxlen,), dtype='int32')
# Create questions Embedding layer
q_layer = Embedding(input_dim=vocab_maxlen, output_dim=50)(questions_input)
# Flatten out the embeddings to single vectors and concatenate them
q_layer = Flatten()(q_layer)
# Concat the two flattened vectors
merge = Concatenate()([s_layer,q_layer])
out = Dense(50,activation='relu',use_bias=True)(merge)
out = Dropout((0.10))(out)
out = Dense(vocab_maxlen,activation='softmax')(out)
model = Model([story_input,questions_input],out)
model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(lr=0.001), metrics=['accuracy'])
model.summary()
print(f'The input layer for the questions has a total of {question_maxlen} neurons to take in a vector of 3 x 1')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment