Skip to content

Instantly share code, notes, and snippets.

@berquist
Last active September 6, 2019 22:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save berquist/f8a9e04fad9df089aecc1b0419670bcf to your computer and use it in GitHub Desktop.
Save berquist/f8a9e04fad9df089aecc1b0419670bcf to your computer and use it in GitHub Desktop.
"""Adapted from https://stackoverflow.com/a/53664580/3249688"""
import numpy as np
from keras.preprocessing import sequence
from keras.models import Sequential
from keras.layers import Dense, Dropout, Embedding, LSTM, Bidirectional
from keras.datasets import imdb
def load_data(n_unique_words=None, maxlen=None):
# https://stackoverflow.com/a/56243777/3249688
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=n_unique_words)
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
y_train = np.array(y_train)
y_test = np.array(y_test)
return x_train, y_train, x_test, y_test
if __name__ == "__main__":
np_load_old = np.load
np.load = lambda *args, **kwargs: np_load_old(*args, allow_pickle=True, **kwargs)
N_UNIQUE_WORDS = 10000 # cut texts after this number of words
MAXLEN = 200
BATCH_SIZE = 1024
x_train, y_train, x_test, y_test = load_data(
n_unique_words=N_UNIQUE_WORDS, maxlen=MAXLEN
)
model = Sequential(
[
Embedding(N_UNIQUE_WORDS, 128, input_length=MAXLEN),
Bidirectional(LSTM(64)),
Dropout(0.5),
Dense(1, activation="sigmoid"),
]
)
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
print("Train...")
model.fit(
x_train,
y_train,
batch_size=BATCH_SIZE,
epochs=4,
validation_data=[x_test, y_test],
)
print("Evaluate...")
model.evaluate(x_test, y_test, batch_size=BATCH_SIZE)
# Remove restriction on the number of unique words.
(x_train2, y_train2), (x_test2, y_test2) = imdb.load_data(num_words=None)
x_train2, y_train2, x_test2, y_test2 = load_data(n_unique_words=None, maxlen=MAXLEN)
N_UNIQUE_WORDS = max(np.max(np.max(x_train2)), np.max(np.max(x_test2)))
# This is probably too large.
model2 = Sequential(
[
# (number of possible tokens, dimension of embedding space)
Embedding(N_UNIQUE_WORDS, 128, input_length=MAXLEN),
Bidirectional(LSTM(64)),
Dropout(0.5),
Dense(1, activation="sigmoid"),
]
)
model2.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
print("Train...")
model2.fit(
x_train2,
y_train2,
batch_size=BATCH_SIZE,
epochs=4,
validation_data=[x_test2, y_test2],
)
print("Evaluate...")
model2.evaluate(x_test2, y_test2, batch_size=BATCH_SIZE)
# Increase the size of the embedding space.
model3 = Sequential(
[
# (number of possible tokens, dimension of embedding space)
Embedding(N_UNIQUE_WORDS, 768, input_length=MAXLEN),
Bidirectional(LSTM(64)),
Dropout(0.5),
Dense(1, activation="sigmoid"),
]
)
model3.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
print("Train...")
model3.fit(
x_train2,
y_train2,
batch_size=BATCH_SIZE,
epochs=4,
validation_data=[x_test2, y_test2],
)
print("Evaluate...")
model3.evaluate(x_test2, y_test2, batch_size=BATCH_SIZE)
np.load = np_load_old
import numpy as np
from keras import Model
from keras.layers import (
Bidirectional,
Dense,
Embedding,
Input,
LSTM,
Masking,
TimeDistributed,
)
if __name__ == "__main__":
num_classes = 2
num_symbols = 2
datapoints = 4
seq_len = 5
x = np.random.randint(num_symbols, size=(datapoints, seq_len))
y = np.random.randint(num_classes, size=(datapoints, seq_len))
y_one_hot = np.zeros((datapoints, seq_len, num_classes))
for i in range(datapoints):
for j in range(seq_len):
y_one_hot[i][j][y[i][j]] = 1
embedding_size = 10
embedding_weights = np.zeros((num_symbols, embedding_size))
for i in range(num_symbols):
embedding_weights[i] = np.random.rand(embedding_size)
input_layer = Input(shape=(seq_len,), dtype=np.int32)
embedding = Embedding(num_symbols, embedding_size, input_length=seq_len)
embedded_input = embedding(input_layer)
mask = Masking(mask_value=0)(embedded_input)
bidirect = Bidirectional(LSTM(100, return_sequences=True))(mask)
final = TimeDistributed(Dense(num_classes, activation="softmax"))(bidirect)
model = Model(inputs=[input_layer], outputs=[final])
model.compile(
loss="categorical_crossentropy", optimizer="rmsprop", metrics=["accuracy"]
)
model.fit(x, y_one_hot)
print(x)
print(y)
print(y_one_hot)
print(model.predict(x))
from functools import reduce
import re
import tarfile
import numpy as np
from keras.utils.data_utils import get_file
from keras.layers import recurrent, Embedding, Dense, concatenate, Input
from keras.models import Model
from keras.preprocessing.sequence import pad_sequences
def tokenize(sent):
"""Return the tokens of a sentence including punctuation.
>>> tokenize('Bob dropped the apple. Where is the apple?')
['Bob', 'dropped', 'the', 'apple', '.', 'Where', 'is', 'the', 'apple', '?']
"""
return [x.strip() for x in re.split(r"(\W+)+", sent) if x.strip()]
def parse_stories(lines, only_supporting=False):
"""Parse stories provided in the bAbi tasks format
If only_supporting is true,
only the sentences that support the answer are kept.
"""
data = []
story = []
for line in lines:
line = line.decode("utf-8").strip()
nid, line = line.split(" ", 1)
nid = int(nid)
if nid == 1:
story = []
# TODO what is this for?
if "\t" in line:
q, a, supporting = line.split("\t")
q = tokenize(q)
if only_supporting:
# Only select the related substory
supporting = map(int, supporting.split())
substory = [story[i - 1] for i in supporting]
else:
# Provide all the substories
substory = [x for x in story if x]
data.append((substory, q, a))
story.append("")
else:
sent = tokenize(line)
story.append(sent)
return data
def get_stories(f, only_supporting=False, max_length=None):
"""Given a file name, read the file, retrieve the stories,
and then convert the sentences into a single story.
If max_length is supplied,
any stories longer than max_length tokens will be discarded.
"""
data = parse_stories(f.readlines(), only_supporting=only_supporting)
flatten = lambda data: reduce(lambda x, y: x + y, data)
return [
(flatten(story), q, answer)
for story, q, answer in data
if not max_length or len(flatten(story)) < max_length
]
def vectorize_stories(data, word_idx, story_maxlen, query_maxlen):
xs = []
xqs = []
ys = []
for story, query, answer in data:
x = [word_idx[w] for w in story]
xq = [word_idx[w] for w in query]
# let's not forget that index 0 is reserved
y = np.zeros(len(word_idx) + 1)
y[word_idx[answer]] = 1
xs.append(x)
xqs.append(xq)
ys.append(y)
return (
pad_sequences(xs, maxlen=story_maxlen),
pad_sequences(xqs, maxlen=query_maxlen),
np.array(ys),
)
if __name__ == "__main__":
RNN = recurrent.LSTM
EMBED_HIDDEN_SIZE = 50
SENT_HIDDEN_SIZE = 100
QUERY_HIDDEN_SIZE = 100
BATCH_SIZE = 128
EPOCHS = 20
print(
"RNN / Embed / Sent / Query = {}, {}, {}, {}".format(
RNN, EMBED_HIDDEN_SIZE, SENT_HIDDEN_SIZE, QUERY_HIDDEN_SIZE
)
)
try:
path = get_file(
"babi-tasks-v1-2.tar.gz",
origin="https://s3.amazonaws.com/text-datasets/"
"babi_tasks_1-20_v1-2.tar.gz",
)
except:
print(
"Error downloading dataset, please download it manually:\n"
"$ wget http://www.thespermwhale.com/jaseweston/babi/tasks_1-20_v1-2"
".tar.gz\n"
"$ mv tasks_1-20_v1-2.tar.gz ~/.keras/datasets/babi-tasks-v1-2.tar.gz"
)
raise
# Default QA1 with 1000 samples
# challenge = 'tasks_1-20_v1-2/en/qa1_single-supporting-fact_{}.txt'
# QA1 with 10,000 samples
# challenge = 'tasks_1-20_v1-2/en-10k/qa1_single-supporting-fact_{}.txt'
# QA2 with 1000 samples
challenge = "tasks_1-20_v1-2/en/qa2_two-supporting-facts_{}.txt"
# QA2 with 10,000 samples
# challenge = 'tasks_1-20_v1-2/en-10k/qa2_two-supporting-facts_{}.txt'
with tarfile.open(path) as tar:
train = get_stories(tar.extractfile(challenge.format("train")))
test = get_stories(tar.extractfile(challenge.format("test")))
vocab = set()
for story, q, answer in train + test:
vocab |= set(story + q + [answer])
vocab = sorted(vocab)
# Reserve 0 for masking via pad_sequences
vocab_size = len(vocab) + 1
word_idx = dict((c, i + 1) for i, c in enumerate(vocab))
story_maxlen = max(map(len, (x for x, _, _ in train + test)))
query_maxlen = max(map(len, (x for _, x, _ in train + test)))
x, xq, y = vectorize_stories(train, word_idx, story_maxlen, query_maxlen)
tx, txq, ty = vectorize_stories(test, word_idx, story_maxlen, query_maxlen)
print("vocab = {}".format(vocab))
print("x.shape = {}".format(x.shape))
print("xq.shape = {}".format(xq.shape))
print("y.shape = {}".format(y.shape))
print("story_maxlen, query_maxlen = {}, {}".format(story_maxlen, query_maxlen))
print("Build model...")
sentence = Input(shape=(story_maxlen,), dtype="int32")
encoded_sentence = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence)
encoded_sentence = RNN(SENT_HIDDEN_SIZE)(encoded_sentence)
question = Input(shape=(query_maxlen,), dtype="int32")
encoded_question = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question)
encoded_question = RNN(QUERY_HIDDEN_SIZE)(encoded_question)
merged = concatenate([encoded_sentence, encoded_question])
preds = Dense(vocab_size, activation="softmax")(merged)
model = Model([sentence, question], preds)
model.compile(
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
)
print("Training")
model.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05)
print("Evaluation")
loss, acc = model.evaluate([tx, txq], ty, batch_size=BATCH_SIZE)
print("Test loss / test accuracy = {:.4f} / {:.4f}".format(loss, acc))
# [1.7842422828674316, 0.232]
######################################################################
from keras.layers import Bidirectional
# replace LSTMs with BiLSTMs of half size, summing the Bi-LSTM output
# [1.7144300785064697, 0.288]
sentence2 = Input(shape=(story_maxlen,), dtype="int32")
encoded_sentence2 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence2)
encoded_sentence2 = Bidirectional(RNN(SENT_HIDDEN_SIZE // 2), merge_mode="sum")(
encoded_sentence2
)
question2 = Input(shape=(query_maxlen,), dtype="int32")
encoded_question2 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question2)
encoded_question2 = Bidirectional(RNN(QUERY_HIDDEN_SIZE // 2), merge_mode="sum")(
encoded_question2
)
merged2 = concatenate([encoded_sentence2, encoded_question2])
preds2 = Dense(vocab_size, activation="softmax")(merged2)
model2 = Model([sentence2, question2], preds2)
model2.compile(
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
)
# model2.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05)
# replace LSTMs with BiLSTMs of half size, concatentating the Bi-LSTM output
# [1.6985360298156738, 0.291]
sentence3 = Input(shape=(story_maxlen,), dtype="int32")
encoded_sentence3 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence3)
encoded_sentence3 = Bidirectional(RNN(SENT_HIDDEN_SIZE // 2), merge_mode="concat")(
encoded_sentence3
)
question3 = Input(shape=(query_maxlen,), dtype="int32")
encoded_question3 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question3)
encoded_question3 = Bidirectional(RNN(QUERY_HIDDEN_SIZE // 2), merge_mode="concat")(
encoded_question3
)
merged3 = concatenate([encoded_sentence3, encoded_question3])
preds3 = Dense(vocab_size, activation="softmax")(merged3)
model3 = Model([sentence3, question3], preds3)
model3.compile(
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
)
# model3.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05)
# same as above but much smaller LSTMs
# [1.742978988647461, 0.25]
sentence4 = Input(shape=(story_maxlen,), dtype="int32")
encoded_sentence4 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence4)
encoded_sentence4 = Bidirectional(RNN(SENT_HIDDEN_SIZE // 5), merge_mode="concat")(
encoded_sentence4
)
question4 = Input(shape=(query_maxlen,), dtype="int32")
encoded_question4 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question4)
encoded_question4 = Bidirectional(RNN(QUERY_HIDDEN_SIZE // 5), merge_mode="concat")(
encoded_question4
)
merged4 = concatenate([encoded_sentence4, encoded_question4])
preds4 = Dense(vocab_size, activation="softmax")(merged4)
model4 = Model([sentence4, question4], preds4)
model4.compile(
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
)
# model4.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05)
import numpy as np
from keras import Model
from keras.layers import Input, LSTM, Masking
if __name__ == "__main__":
## adapted from https://stackoverflow.com/q/47057361/
max_sentence_length = 5
character_number = 2
input_tensor = Input(shape=(max_sentence_length, character_number))
masked_input = Masking(mask_value=0)(input_tensor)
output = LSTM(3, return_sequences=True)(masked_input)
model = Model(input_tensor, output)
model.compile(optimizer="adam", loss="mae")
X = np.array([[[0, 0], [0, 0], [1, 0], [0, 1], [0, 1]],
[[0, 0], [0, 1], [1, 0], [0, 1], [0, 1]]])
y_true = np.ones((2, max_sentence_length, 3))
y_pred = model.predict(X)
print(y_pred)
print(y_pred.shape)
# See if the loss computed by model.evaluate() is equal to the masked loss
unmasked_loss = np.abs(1 - y_pred).mean()
masked_loss = np.abs(1 - y_pred[y_pred != 0.0]).mean()
print(f"unmasked loss: {unmasked_loss}")
print(f"masked loss: {masked_loss}")
print(f"evaluate with Keras: {model.evaluate(X, y_true, verbose=0)}")
# Why is this zero?
# print(f"evaluate with Keras: {model.evaluate(X, y_pred, verbose=0)}")
## try again using a non-zero mask value
masked_input = Masking(mask_value=8)(input_tensor)
output = LSTM(3, return_sequences=True)(masked_input)
model = Model(input_tensor, output)
model.compile(optimizer="adam", loss="mae")
X = np.array([[[8, 8], [8, 8], [1, 0], [0, 1], [0, 1]],
[[8, 8], [0, 1], [1, 0], [0, 1], [0, 1]]])
y_true = np.ones((2, max_sentence_length, 3))
y_pred = model.predict(X)
print(y_pred)
print(y_pred.shape)
# See if the loss computed by model.evaluate() is equal to the masked loss
unmasked_loss = np.abs(1 - y_pred).mean()
masked_loss = np.abs(1 - y_pred[y_pred != 0.0]).mean()
print(f"unmasked loss: {unmasked_loss}")
print(f"masked loss: {masked_loss}")
print(f"evaluate with Keras: {model.evaluate(X, y_true, verbose=0)}")
## try again using a floating-point mask value
masked_input = Masking(mask_value=0.0)(input_tensor)
output = LSTM(3, return_sequences=True)(masked_input)
model = Model(input_tensor, output)
model.compile(optimizer="adam", loss="mae")
X = np.array([[[0.0, 0.0], [0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [0.0, 1.0]],
[[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [0.0, 1.0], [0.0, 1.0]]])
y_true = np.ones((2, max_sentence_length, 3))
y_pred = model.predict(X)
print(y_pred)
print(y_pred.shape)
# See if the loss computed by model.evaluate() is equal to the masked loss
unmasked_loss = np.abs(1 - y_pred).mean()
masked_loss = np.abs(1 - y_pred[y_pred != 0.0]).mean()
print(f"unmasked loss: {unmasked_loss}")
print(f"masked loss: {masked_loss}")
print(f"evaluate with Keras: {model.evaluate(X, y_true, verbose=0)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment