Last active
September 6, 2019 22:36
-
-
Save berquist/f8a9e04fad9df089aecc1b0419670bcf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Adapted from https://stackoverflow.com/a/53664580/3249688""" | |
import numpy as np | |
from keras.preprocessing import sequence | |
from keras.models import Sequential | |
from keras.layers import Dense, Dropout, Embedding, LSTM, Bidirectional | |
from keras.datasets import imdb | |
def load_data(n_unique_words=None, maxlen=None): | |
# https://stackoverflow.com/a/56243777/3249688 | |
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=n_unique_words) | |
x_train = sequence.pad_sequences(x_train, maxlen=maxlen) | |
x_test = sequence.pad_sequences(x_test, maxlen=maxlen) | |
y_train = np.array(y_train) | |
y_test = np.array(y_test) | |
return x_train, y_train, x_test, y_test | |
if __name__ == "__main__": | |
np_load_old = np.load | |
np.load = lambda *args, **kwargs: np_load_old(*args, allow_pickle=True, **kwargs) | |
N_UNIQUE_WORDS = 10000 # cut texts after this number of words | |
MAXLEN = 200 | |
BATCH_SIZE = 1024 | |
x_train, y_train, x_test, y_test = load_data( | |
n_unique_words=N_UNIQUE_WORDS, maxlen=MAXLEN | |
) | |
model = Sequential( | |
[ | |
Embedding(N_UNIQUE_WORDS, 128, input_length=MAXLEN), | |
Bidirectional(LSTM(64)), | |
Dropout(0.5), | |
Dense(1, activation="sigmoid"), | |
] | |
) | |
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"]) | |
print("Train...") | |
model.fit( | |
x_train, | |
y_train, | |
batch_size=BATCH_SIZE, | |
epochs=4, | |
validation_data=[x_test, y_test], | |
) | |
print("Evaluate...") | |
model.evaluate(x_test, y_test, batch_size=BATCH_SIZE) | |
# Remove restriction on the number of unique words. | |
(x_train2, y_train2), (x_test2, y_test2) = imdb.load_data(num_words=None) | |
x_train2, y_train2, x_test2, y_test2 = load_data(n_unique_words=None, maxlen=MAXLEN) | |
N_UNIQUE_WORDS = max(np.max(np.max(x_train2)), np.max(np.max(x_test2))) | |
# This is probably too large. | |
model2 = Sequential( | |
[ | |
# (number of possible tokens, dimension of embedding space) | |
Embedding(N_UNIQUE_WORDS, 128, input_length=MAXLEN), | |
Bidirectional(LSTM(64)), | |
Dropout(0.5), | |
Dense(1, activation="sigmoid"), | |
] | |
) | |
model2.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"]) | |
print("Train...") | |
model2.fit( | |
x_train2, | |
y_train2, | |
batch_size=BATCH_SIZE, | |
epochs=4, | |
validation_data=[x_test2, y_test2], | |
) | |
print("Evaluate...") | |
model2.evaluate(x_test2, y_test2, batch_size=BATCH_SIZE) | |
# Increase the size of the embedding space. | |
model3 = Sequential( | |
[ | |
# (number of possible tokens, dimension of embedding space) | |
Embedding(N_UNIQUE_WORDS, 768, input_length=MAXLEN), | |
Bidirectional(LSTM(64)), | |
Dropout(0.5), | |
Dense(1, activation="sigmoid"), | |
] | |
) | |
model3.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"]) | |
print("Train...") | |
model3.fit( | |
x_train2, | |
y_train2, | |
batch_size=BATCH_SIZE, | |
epochs=4, | |
validation_data=[x_test2, y_test2], | |
) | |
print("Evaluate...") | |
model3.evaluate(x_test2, y_test2, batch_size=BATCH_SIZE) | |
np.load = np_load_old |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from keras import Model | |
from keras.layers import ( | |
Bidirectional, | |
Dense, | |
Embedding, | |
Input, | |
LSTM, | |
Masking, | |
TimeDistributed, | |
) | |
if __name__ == "__main__": | |
num_classes = 2 | |
num_symbols = 2 | |
datapoints = 4 | |
seq_len = 5 | |
x = np.random.randint(num_symbols, size=(datapoints, seq_len)) | |
y = np.random.randint(num_classes, size=(datapoints, seq_len)) | |
y_one_hot = np.zeros((datapoints, seq_len, num_classes)) | |
for i in range(datapoints): | |
for j in range(seq_len): | |
y_one_hot[i][j][y[i][j]] = 1 | |
embedding_size = 10 | |
embedding_weights = np.zeros((num_symbols, embedding_size)) | |
for i in range(num_symbols): | |
embedding_weights[i] = np.random.rand(embedding_size) | |
input_layer = Input(shape=(seq_len,), dtype=np.int32) | |
embedding = Embedding(num_symbols, embedding_size, input_length=seq_len) | |
embedded_input = embedding(input_layer) | |
mask = Masking(mask_value=0)(embedded_input) | |
bidirect = Bidirectional(LSTM(100, return_sequences=True))(mask) | |
final = TimeDistributed(Dense(num_classes, activation="softmax"))(bidirect) | |
model = Model(inputs=[input_layer], outputs=[final]) | |
model.compile( | |
loss="categorical_crossentropy", optimizer="rmsprop", metrics=["accuracy"] | |
) | |
model.fit(x, y_one_hot) | |
print(x) | |
print(y) | |
print(y_one_hot) | |
print(model.predict(x)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import reduce | |
import re | |
import tarfile | |
import numpy as np | |
from keras.utils.data_utils import get_file | |
from keras.layers import recurrent, Embedding, Dense, concatenate, Input | |
from keras.models import Model | |
from keras.preprocessing.sequence import pad_sequences | |
def tokenize(sent): | |
"""Return the tokens of a sentence including punctuation. | |
>>> tokenize('Bob dropped the apple. Where is the apple?') | |
['Bob', 'dropped', 'the', 'apple', '.', 'Where', 'is', 'the', 'apple', '?'] | |
""" | |
return [x.strip() for x in re.split(r"(\W+)+", sent) if x.strip()] | |
def parse_stories(lines, only_supporting=False): | |
"""Parse stories provided in the bAbi tasks format | |
If only_supporting is true, | |
only the sentences that support the answer are kept. | |
""" | |
data = [] | |
story = [] | |
for line in lines: | |
line = line.decode("utf-8").strip() | |
nid, line = line.split(" ", 1) | |
nid = int(nid) | |
if nid == 1: | |
story = [] | |
# TODO what is this for? | |
if "\t" in line: | |
q, a, supporting = line.split("\t") | |
q = tokenize(q) | |
if only_supporting: | |
# Only select the related substory | |
supporting = map(int, supporting.split()) | |
substory = [story[i - 1] for i in supporting] | |
else: | |
# Provide all the substories | |
substory = [x for x in story if x] | |
data.append((substory, q, a)) | |
story.append("") | |
else: | |
sent = tokenize(line) | |
story.append(sent) | |
return data | |
def get_stories(f, only_supporting=False, max_length=None): | |
"""Given a file name, read the file, retrieve the stories, | |
and then convert the sentences into a single story. | |
If max_length is supplied, | |
any stories longer than max_length tokens will be discarded. | |
""" | |
data = parse_stories(f.readlines(), only_supporting=only_supporting) | |
flatten = lambda data: reduce(lambda x, y: x + y, data) | |
return [ | |
(flatten(story), q, answer) | |
for story, q, answer in data | |
if not max_length or len(flatten(story)) < max_length | |
] | |
def vectorize_stories(data, word_idx, story_maxlen, query_maxlen): | |
xs = [] | |
xqs = [] | |
ys = [] | |
for story, query, answer in data: | |
x = [word_idx[w] for w in story] | |
xq = [word_idx[w] for w in query] | |
# let's not forget that index 0 is reserved | |
y = np.zeros(len(word_idx) + 1) | |
y[word_idx[answer]] = 1 | |
xs.append(x) | |
xqs.append(xq) | |
ys.append(y) | |
return ( | |
pad_sequences(xs, maxlen=story_maxlen), | |
pad_sequences(xqs, maxlen=query_maxlen), | |
np.array(ys), | |
) | |
if __name__ == "__main__": | |
RNN = recurrent.LSTM | |
EMBED_HIDDEN_SIZE = 50 | |
SENT_HIDDEN_SIZE = 100 | |
QUERY_HIDDEN_SIZE = 100 | |
BATCH_SIZE = 128 | |
EPOCHS = 20 | |
print( | |
"RNN / Embed / Sent / Query = {}, {}, {}, {}".format( | |
RNN, EMBED_HIDDEN_SIZE, SENT_HIDDEN_SIZE, QUERY_HIDDEN_SIZE | |
) | |
) | |
try: | |
path = get_file( | |
"babi-tasks-v1-2.tar.gz", | |
origin="https://s3.amazonaws.com/text-datasets/" | |
"babi_tasks_1-20_v1-2.tar.gz", | |
) | |
except: | |
print( | |
"Error downloading dataset, please download it manually:\n" | |
"$ wget http://www.thespermwhale.com/jaseweston/babi/tasks_1-20_v1-2" | |
".tar.gz\n" | |
"$ mv tasks_1-20_v1-2.tar.gz ~/.keras/datasets/babi-tasks-v1-2.tar.gz" | |
) | |
raise | |
# Default QA1 with 1000 samples | |
# challenge = 'tasks_1-20_v1-2/en/qa1_single-supporting-fact_{}.txt' | |
# QA1 with 10,000 samples | |
# challenge = 'tasks_1-20_v1-2/en-10k/qa1_single-supporting-fact_{}.txt' | |
# QA2 with 1000 samples | |
challenge = "tasks_1-20_v1-2/en/qa2_two-supporting-facts_{}.txt" | |
# QA2 with 10,000 samples | |
# challenge = 'tasks_1-20_v1-2/en-10k/qa2_two-supporting-facts_{}.txt' | |
with tarfile.open(path) as tar: | |
train = get_stories(tar.extractfile(challenge.format("train"))) | |
test = get_stories(tar.extractfile(challenge.format("test"))) | |
vocab = set() | |
for story, q, answer in train + test: | |
vocab |= set(story + q + [answer]) | |
vocab = sorted(vocab) | |
# Reserve 0 for masking via pad_sequences | |
vocab_size = len(vocab) + 1 | |
word_idx = dict((c, i + 1) for i, c in enumerate(vocab)) | |
story_maxlen = max(map(len, (x for x, _, _ in train + test))) | |
query_maxlen = max(map(len, (x for _, x, _ in train + test))) | |
x, xq, y = vectorize_stories(train, word_idx, story_maxlen, query_maxlen) | |
tx, txq, ty = vectorize_stories(test, word_idx, story_maxlen, query_maxlen) | |
print("vocab = {}".format(vocab)) | |
print("x.shape = {}".format(x.shape)) | |
print("xq.shape = {}".format(xq.shape)) | |
print("y.shape = {}".format(y.shape)) | |
print("story_maxlen, query_maxlen = {}, {}".format(story_maxlen, query_maxlen)) | |
print("Build model...") | |
sentence = Input(shape=(story_maxlen,), dtype="int32") | |
encoded_sentence = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence) | |
encoded_sentence = RNN(SENT_HIDDEN_SIZE)(encoded_sentence) | |
question = Input(shape=(query_maxlen,), dtype="int32") | |
encoded_question = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question) | |
encoded_question = RNN(QUERY_HIDDEN_SIZE)(encoded_question) | |
merged = concatenate([encoded_sentence, encoded_question]) | |
preds = Dense(vocab_size, activation="softmax")(merged) | |
model = Model([sentence, question], preds) | |
model.compile( | |
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"] | |
) | |
print("Training") | |
model.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05) | |
print("Evaluation") | |
loss, acc = model.evaluate([tx, txq], ty, batch_size=BATCH_SIZE) | |
print("Test loss / test accuracy = {:.4f} / {:.4f}".format(loss, acc)) | |
# [1.7842422828674316, 0.232] | |
###################################################################### | |
from keras.layers import Bidirectional | |
# replace LSTMs with BiLSTMs of half size, summing the Bi-LSTM output | |
# [1.7144300785064697, 0.288] | |
sentence2 = Input(shape=(story_maxlen,), dtype="int32") | |
encoded_sentence2 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence2) | |
encoded_sentence2 = Bidirectional(RNN(SENT_HIDDEN_SIZE // 2), merge_mode="sum")( | |
encoded_sentence2 | |
) | |
question2 = Input(shape=(query_maxlen,), dtype="int32") | |
encoded_question2 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question2) | |
encoded_question2 = Bidirectional(RNN(QUERY_HIDDEN_SIZE // 2), merge_mode="sum")( | |
encoded_question2 | |
) | |
merged2 = concatenate([encoded_sentence2, encoded_question2]) | |
preds2 = Dense(vocab_size, activation="softmax")(merged2) | |
model2 = Model([sentence2, question2], preds2) | |
model2.compile( | |
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"] | |
) | |
# model2.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05) | |
# replace LSTMs with BiLSTMs of half size, concatentating the Bi-LSTM output | |
# [1.6985360298156738, 0.291] | |
sentence3 = Input(shape=(story_maxlen,), dtype="int32") | |
encoded_sentence3 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence3) | |
encoded_sentence3 = Bidirectional(RNN(SENT_HIDDEN_SIZE // 2), merge_mode="concat")( | |
encoded_sentence3 | |
) | |
question3 = Input(shape=(query_maxlen,), dtype="int32") | |
encoded_question3 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question3) | |
encoded_question3 = Bidirectional(RNN(QUERY_HIDDEN_SIZE // 2), merge_mode="concat")( | |
encoded_question3 | |
) | |
merged3 = concatenate([encoded_sentence3, encoded_question3]) | |
preds3 = Dense(vocab_size, activation="softmax")(merged3) | |
model3 = Model([sentence3, question3], preds3) | |
model3.compile( | |
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"] | |
) | |
# model3.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05) | |
# same as above but much smaller LSTMs | |
# [1.742978988647461, 0.25] | |
sentence4 = Input(shape=(story_maxlen,), dtype="int32") | |
encoded_sentence4 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(sentence4) | |
encoded_sentence4 = Bidirectional(RNN(SENT_HIDDEN_SIZE // 5), merge_mode="concat")( | |
encoded_sentence4 | |
) | |
question4 = Input(shape=(query_maxlen,), dtype="int32") | |
encoded_question4 = Embedding(vocab_size, EMBED_HIDDEN_SIZE)(question4) | |
encoded_question4 = Bidirectional(RNN(QUERY_HIDDEN_SIZE // 5), merge_mode="concat")( | |
encoded_question4 | |
) | |
merged4 = concatenate([encoded_sentence4, encoded_question4]) | |
preds4 = Dense(vocab_size, activation="softmax")(merged4) | |
model4 = Model([sentence4, question4], preds4) | |
model4.compile( | |
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"] | |
) | |
# model4.fit([x, xq], y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.05) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from keras import Model | |
from keras.layers import Input, LSTM, Masking | |
if __name__ == "__main__": | |
## adapted from https://stackoverflow.com/q/47057361/ | |
max_sentence_length = 5 | |
character_number = 2 | |
input_tensor = Input(shape=(max_sentence_length, character_number)) | |
masked_input = Masking(mask_value=0)(input_tensor) | |
output = LSTM(3, return_sequences=True)(masked_input) | |
model = Model(input_tensor, output) | |
model.compile(optimizer="adam", loss="mae") | |
X = np.array([[[0, 0], [0, 0], [1, 0], [0, 1], [0, 1]], | |
[[0, 0], [0, 1], [1, 0], [0, 1], [0, 1]]]) | |
y_true = np.ones((2, max_sentence_length, 3)) | |
y_pred = model.predict(X) | |
print(y_pred) | |
print(y_pred.shape) | |
# See if the loss computed by model.evaluate() is equal to the masked loss | |
unmasked_loss = np.abs(1 - y_pred).mean() | |
masked_loss = np.abs(1 - y_pred[y_pred != 0.0]).mean() | |
print(f"unmasked loss: {unmasked_loss}") | |
print(f"masked loss: {masked_loss}") | |
print(f"evaluate with Keras: {model.evaluate(X, y_true, verbose=0)}") | |
# Why is this zero? | |
# print(f"evaluate with Keras: {model.evaluate(X, y_pred, verbose=0)}") | |
## try again using a non-zero mask value | |
masked_input = Masking(mask_value=8)(input_tensor) | |
output = LSTM(3, return_sequences=True)(masked_input) | |
model = Model(input_tensor, output) | |
model.compile(optimizer="adam", loss="mae") | |
X = np.array([[[8, 8], [8, 8], [1, 0], [0, 1], [0, 1]], | |
[[8, 8], [0, 1], [1, 0], [0, 1], [0, 1]]]) | |
y_true = np.ones((2, max_sentence_length, 3)) | |
y_pred = model.predict(X) | |
print(y_pred) | |
print(y_pred.shape) | |
# See if the loss computed by model.evaluate() is equal to the masked loss | |
unmasked_loss = np.abs(1 - y_pred).mean() | |
masked_loss = np.abs(1 - y_pred[y_pred != 0.0]).mean() | |
print(f"unmasked loss: {unmasked_loss}") | |
print(f"masked loss: {masked_loss}") | |
print(f"evaluate with Keras: {model.evaluate(X, y_true, verbose=0)}") | |
## try again using a floating-point mask value | |
masked_input = Masking(mask_value=0.0)(input_tensor) | |
output = LSTM(3, return_sequences=True)(masked_input) | |
model = Model(input_tensor, output) | |
model.compile(optimizer="adam", loss="mae") | |
X = np.array([[[0.0, 0.0], [0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [0.0, 1.0]], | |
[[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [0.0, 1.0], [0.0, 1.0]]]) | |
y_true = np.ones((2, max_sentence_length, 3)) | |
y_pred = model.predict(X) | |
print(y_pred) | |
print(y_pred.shape) | |
# See if the loss computed by model.evaluate() is equal to the masked loss | |
unmasked_loss = np.abs(1 - y_pred).mean() | |
masked_loss = np.abs(1 - y_pred[y_pred != 0.0]).mean() | |
print(f"unmasked loss: {unmasked_loss}") | |
print(f"masked loss: {masked_loss}") | |
print(f"evaluate with Keras: {model.evaluate(X, y_true, verbose=0)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment