Skip to content

Instantly share code, notes, and snippets.

@kuriringohankamehameha
Created April 7, 2020 10:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kuriringohankamehameha/d65164f0b8cf80a2325b74e6a0a013bb to your computer and use it in GitHub Desktop.
Save kuriringohankamehameha/d65164f0b8cf80a2325b74e6a0a013bb to your computer and use it in GitHub Desktop.
A sample word2vec implementation of the Continuous Bag of Words model in Numpy
import numpy as np
import re
from nltk.corpus import brown
import matplotlib.pyplot as plt
import time
def tokenize(text):
pattern = re.compile(r'[A-Za-z]+[\w^\']*|[\w^\']*[A-Za-z]+[\w^\']*')
try:
return pattern.findall(text.lower())
except:
return pattern.findall(text)
def generate_mapping(tokens):
# Generates the word_to_idx and idx_to_word mappings
word_to_idx, idx_to_word = {}, {}
for idx, token in enumerate(set(tokens)):
word_to_idx[token] = idx
idx_to_word[idx] = token
return word_to_idx, idx_to_word
def get_training_data(tokens, word_to_ix, CONTEXT_SIZE):
X, Y = [], []
for i in range(CONTEXT_SIZE, len(tokens) - CONTEXT_SIZE):
# Context spans from max(0, i - CONTEXT_SIZE) to min(len(tokens) - 1, i + CONTEXT_SIZE)
# for j in range(max(0, i - CONTEXT_SIZE), min(len(tokens), i + CONTEXT_SIZE + 1)):
for j in range(i - CONTEXT_SIZE, i + CONTEXT_SIZE + 1):
if j == i: continue
Y.append(word_to_ix[tokens[j]])
X.append(word_to_ix[tokens[i]])
X, Y = np.array(X), np.array(Y)
X, Y = X[np.newaxis, :], Y[np.newaxis, :]
assert 2 * CONTEXT_SIZE * X.shape[1] == Y.shape[1]
return X, Y
def load_corpus(filename):
corpus = ""
with open(filename, 'r', encoding="utf8") as rf:
for line in rf:
line = line.strip()
corpus += line
return corpus
class CBOW:
def __init__(self, contexts, targets, vocab_size, frequency_weights, embedding_size, learning_rate, context_size=3, do_negative_sampling=True, sampling_distribution = None):
# Initialize Stuff
self.embedding_size = embedding_size
self.context_size = context_size
self.embedding_matrix = np.random.randn(vocab_size, embedding_size) * 0.01 * frequency_weights.T
self.num_classes = vocab_size
self.sampling_distribution = sampling_distribution.flatten()
self.frequency_weights = frequency_weights
self.W = np.random.randn(embedding_size, self.num_classes) * 0.01 * frequency_weights
self.bias = np.zeros((1, self.W.shape[1]))
self.word_vec = None
self.gradients = dict()
self.cache_contexts = None # For backprop
self.cache_Z = None # For backprop
self.learning_rate = learning_rate
self.contexts = contexts
self.targets = targets
self.context_width = 2*context_size
self.negative_samples = None
self.do_negative_sampling = do_negative_sampling
def Embedding(self, contexts):
# Op.shape = 1 * embedding_size (Column Vector)
# Take the average vector from the context window
word_vec = np.mean(self.embedding_matrix[contexts, :], axis=1)
#assert word_vec.shape == (self.context_width, self.embedding_matrix.shape[1])
assert word_vec.shape == (1, self.embedding_matrix.shape[1])
self.word_vec = word_vec
def tanh(self, inp):
return np.tanh(inp)
def tanh_delta(self, inp):
return (1 - inp*inp)
def Dense(self, word_vec, do_negative_sampling):
# Pass the word_vec thru the Dense Layer
if do_negative_sampling:
op = np.dot(word_vec, self.W[:, self.negative_samples]) + self.bias[:, self.negative_samples]
# Normalize before passing to tanh
v_max, v_min = np.max(word_vec), np.min(word_vec)
op = (op - v_min) / (v_max - v_min + 1e-5)
#op = op / (np.sum(1e-2 + np.max(op), keepdims=True))
op = self.tanh(op)
#print(op.shape, self.negative_samples.shape)
assert op.shape == (1, self.negative_samples.shape[0])
return op
op = np.dot(word_vec, self.W) + self.bias
# Normalize before passing to tanh
v_max, v_min = np.max(word_vec), np.min(word_vec)
op = (op - v_min) / (v_max - v_min + 1e-5)
#op = op / (np.sum(1e-2 + np.max(op), keepdims=True))
op = self.tanh(op)
assert op.shape == (1, self.W.shape[1])
return op
def softmax(self, inp, do_negative_sampling):
if do_negative_sampling:
assert inp.shape == (1, self.negative_samples.shape[0])
else:
assert inp.shape == (1, self.num_classes)
max_val = np.max(inp, axis=1)
softmax_out = np.exp(inp - max_val) / (np.sum(np.exp(inp - max_val), keepdims=True) + 1e-3)
return softmax_out
def forward(self, contexts, target, do_negative_sampling):
if do_negative_sampling:
def normalize_prob(p):
p = p * (1./p.sum())
return p
k = 20
target_frequency = self.sampling_distribution[target]
self.sampling_distribution[target] = 0
samples = np.arange(0, self.num_classes, dtype=np.int)
#samples = np.concatenate([np.arange(0, target, dtype=np.int), np.arange(target + 1, self.num_classes, dtype=np.int)])
prob_distribution = self.sampling_distribution
self.negative_samples = np.random.choice(samples, k, p=normalize_prob(self.sampling_distribution), replace=False)
self.sampling_distribution[target] = target_frequency
#self.negative_samples = np.random.choice(self.num_classes, k, p=[1/(self.num_classes - 1) if i != target else 0 for i in range(self.num_classes)], replace=False)
self.negative_samples = np.hstack((self.negative_samples, target)) # Last index is the positive sample
self.Embedding(contexts)
Z = self.Dense(self.word_vec, self.do_negative_sampling)
self.cache_Z = Z
softmax_out = self.softmax(Z, self.do_negative_sampling)
self.cache_contexts = contexts
return softmax_out
self.Embedding(contexts)
Z = self.Dense(self.word_vec, self.do_negative_sampling)
self.cache_Z = Z
softmax_out = self.softmax(Z, self.do_negative_sampling)
self.cache_contexts = contexts
return softmax_out
def cross_entropy(self, softmax_out, Y):
target = np.zeros(softmax_out.shape)
target[:, -1] = 1
loss = -(target * np.log(softmax_out + 1e-3))
return np.max(loss)
def backward(self, Y, softmax_out, do_negative_sampling):
# Perform backprop
# Z = tanh(O)
# O = W*word_vec + bias
# softmax_out = softmax(Z)
# dL/d(Z) = (softmax_out - Y)
if (do_negative_sampling == True):
Z = self.cache_Z
target = np.zeros(softmax_out.shape)
target[:, -1] = 1
dL_dZ = softmax_out - target
assert dL_dZ.shape == (1, self.negative_samples.shape[0])
self.gradients['dL_dZ'] = dL_dZ
# dZ_dO = (1 - tanh^2(O)) = (1 - z*z)
dZ_dO = self.tanh_delta(Z)
assert dZ_dO.shape == (1, self.negative_samples.shape[0]) # (1, num_classes)
# dL/dO = dL_dZ * dZ/dO
dL_dO = dL_dZ * dZ_dO
assert dL_dO.shape == (1, self.negative_samples.shape[0])
self.gradients['dL_dO'] = dL_dO
# dL/d(W) = dL/d(O) * d(O)/d(W)
# d(O)/dW = word_vec
dO_dW = self.word_vec
dL_dW = np.dot(dO_dW.T, dL_dO)
assert dL_dW.shape == (self.W.shape[0], self.negative_samples.shape[0]) # (embedding_size, num_classes)
self.gradients['dL_dW'] = dL_dW
# dL/d(word_vec) = dL/dO * dO/d(word_vec) = dL/dO * W
dL_dword_vec = np.dot(dL_dO, self.W[:, self.negative_samples].T)
assert dL_dword_vec.shape == self.word_vec.shape # (1, embedding_size)
self.gradients['dL_dword_vec'] = dL_dword_vec
# dL/d(bias) = dL/dO * dO/d(bias) = dL/dO
dL_dbias = dL_dO
assert dL_dbias.shape == (1, self.negative_samples.shape[0])
self.gradients['dL_dbias'] = dL_dbias
# Clip all gradients
#for grad in self.gradients:
# self.gradients[grad] = np.clip(self.gradients[grad], -500, 500)
return
Z = self.cache_Z
target = np.zeros(softmax_out.shape)
target[:, np.squeeze(Y)] = 1
dL_dZ = softmax_out - target
assert dL_dZ.shape == (1, self.num_classes)
self.gradients['dL_dZ'] = dL_dZ
# dZ_dO = (1 - tanh^2(O)) = (1 - z*z)
dZ_dO = self.tanh_delta(Z)
assert dZ_dO.shape == (1, self.W.shape[1]) # (1, num_classes)
# dL/dO = dL_dZ * dZ/dO
dL_dO = dL_dZ * dZ_dO
assert dL_dO.shape == (1, self.W.shape[1])
self.gradients['dL_dO'] = dL_dO
# dL/d(W) = dL/d(O) * d(O)/d(W)
# d(O)/dW = word_vec
dO_dW = self.word_vec
dL_dW = np.dot(dO_dW.T, dL_dO)
assert dL_dW.shape == self.W.shape # (embedding_size, num_classes)
self.gradients['dL_dW'] = dL_dW
# dL/d(word_vec) = dL/dO * dO/d(word_vec) = dL/dO * W
dL_dword_vec = np.dot(dL_dO, self.W.T)
assert dL_dword_vec.shape == self.word_vec.shape # (1, embedding_size)
self.gradients['dL_dword_vec'] = dL_dword_vec
# dL/d(bias) = dL/dO * dO/d(bias) = dL/dO
dL_dbias = dL_dO
assert dL_dbias.shape == self.bias.shape
self.gradients['dL_dbias'] = dL_dbias
# Clip all gradients
#for grad in self.gradients:
# self.gradients[grad] = np.clip(self.gradients[grad], -500, 500)
#print(self.gradients)
def negative_sampling(self, k):
"""
Performs Negative Sampling to optimize training time.
This approximates the softmax function by randomly updating only a selected few negative samples
"""
self.negative_samples = np.random.choice(self.num_classes, k, replace=False)
#rnd = np.random.choice(self.num_classes, 5, p=[1/39 if i != else 0 for i in range(40)], replace=False)
#dL_dword_vec = self.gradients['dL_dword_vec']
#self.embedding_matrix[negative_samples, :] -= self.learning_rate * dL_dword_vec
#self.W[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dW'][:, negative_samples]
#self.bias[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dbias'][:, negative_samples]
def update(self, do_negative_sampling):
contexts = self.cache_contexts
if do_negative_sampling:
dL_dword_vec = self.gradients['dL_dword_vec']
self.embedding_matrix[contexts] -= self.learning_rate * dL_dword_vec
self.W[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dW']
self.bias[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dbias']
return
dL_dword_vec = self.gradients['dL_dword_vec']
self.embedding_matrix[contexts] -= self.learning_rate * dL_dword_vec
self.W -= self.learning_rate * self.gradients['dL_dW']
self.bias -= self.learning_rate * self.gradients['dL_dbias']
def batch_update(self, X_batch, Y_batch, do_negative_sampling):
if do_negative_sampling:
return
def train(self, epochs):
losses = []
X = self.contexts
Y = self.targets
vocab_size = self.num_classes
context_width = Y.shape[1]
for epoch in range(epochs):
epoch_loss = 0
factor = (2 * CONTEXT_SIZE)
inds = list(range(0, context_width))
np.random.shuffle(inds)
print(f"Item #{inds[0]}")
for i in inds:
#start = time.perf_counter()
X_item = X[:, i*factor:(i+1)* factor]
Y_item = Y[:, i:i+1]
softmax_out = self.forward(X_item, Y_item[0], self.do_negative_sampling)
# self.batch_update(X_batch, Y_batch, self.do_negative_sampling)
self.backward(Y_item, softmax_out, self.do_negative_sampling)
self.update(self.do_negative_sampling)
loss = self.cross_entropy(softmax_out, Y_item)
epoch_loss += np.squeeze(loss)
#print(f"Iteration Time = {time.perf_counter() - start}")
losses.append(epoch_loss)
if epoch:
print(f"Loss after epoch #{epoch}: {epoch_loss}")
plt.plot(np.arange(epochs), losses)
plt.xlabel('# of epochs')
plt.ylabel('cost')
def load_model(filename):
import pickle
with open(filename, 'rb') as input:
model = pickle.load(input)
return model
return None
def save_model(model, filename):
import pickle
with open(filename, 'wb') as output:
pickle.dump(model, output, pickle.HIGHEST_PROTOCOL)
def get_max_idx(inp):
# Get counts and indices
c = np.array(np.unique(np.argmax(inp, axis=1), return_counts=True))
# Get the maximum count
idx = np.argmax(c[1, :])
return c[0, idx]
def get_index_of_max(inp):
return np.argmax(inp, axis=1)
def get_max_prob_result(inp, ix_to_word):
return ix_to_word[get_max_idx(inp)]
def make_context_vector(context, word_to_idx):
return np.array([[word_to_idx[word] for word in tokenize(context)]])
def generate_model_probs(model, context_vector):
softmax_out = model.forward(context_vector, None, do_negative_sampling=False)
return softmax_out
def test(model, corpus, word_to_idx, idx_to_word):
corpus_tokens = tokenize(corpus)
corpus_size = len(corpus_tokens)
for i in range(0 + CONTEXT_SIZE, corpus_size - CONTEXT_SIZE):
word = " ".join([corpus_tokens[j] for j in range(i - CONTEXT_SIZE, i + CONTEXT_SIZE + 1) if j != i])
print("Word:" + word)
context_vector = make_context_vector(word, word_to_idx)
probs = generate_model_probs(model, context_vector)
print(f"Prediction: {get_max_prob_result(probs, idx_to_word)}")
print(probs)
def get_term_frequency(corpus_tokens, word_to_idx):
from collections import defaultdict
freq_dict = defaultdict(int)
max_freq = -1
max_idx = -1
for token in corpus_tokens:
freq_dict[word_to_idx[token]] += 1
if freq_dict[word_to_idx[token]] > max_freq:
max_freq = freq_dict[word_to_idx[token]]
max_idx = word_to_idx[token]
return freq_dict, max_freq, max_idx
def get_frequency_distribution(freq_dict, total_frequency):
unigram_distribution = np.zeros((1, vocab_size), dtype='float32')
for token, freq in freq_dict.items():
unigram_distribution[:, token] = freq / total_frequency
return unigram_distribution
def weigh_rare_tokens(freq_dict, max_freq, max_idx):
import math
frequency_weights = np.zeros((1, vocab_size), dtype='float32')
num_tokens = len(corpus_tokens)
for token, freq in freq_dict.items():
frequency_weights[:, token] = freq_dict[token] * (max_freq - freq_dict[token] + 1) / math.sqrt(freq)
# Normalize frequency weights
frequency_weights = frequency_weights / (frequency_weights[:, max_idx])
# frequency_weights = frequency_weights / (np.sqrt(np.sum(frequency_weights * frequency_weights)))
return frequency_weights
WINDOW_SIZE = 3
CONTEXT_SIZE = 3
corpus = """We are about to study the idea of a computational process. We are about to study the idea of a computational process.
Computational processes are abstract beings that inhabit computers.
As they evolve, processes manipulate other abstract things called data.
The evolution of a process is directed by a pattern of rules
called a program. People create programs to direct processes. In effect,
we conjure the spirits of the computer with our spells. Other than that the computers simply perform what we instruct them to do. Other than that the computers simply perform what we instruct them to do."""
# corpus = load_corpus('sample.txt')
corpus_tokens = tokenize(corpus)
word_to_idx, idx_to_word = generate_mapping(corpus_tokens)
targets, contexts = get_training_data(corpus_tokens, word_to_idx, WINDOW_SIZE)
vocab_size = len(set(corpus_tokens))
freq_dict, max_freq, max_idx = get_term_frequency(corpus_tokens, word_to_idx)
frequency_weights = weigh_rare_tokens(freq_dict, max_freq, max_idx)
total_frequency = sum(freq_dict.values())
unigram_distribution = get_frequency_distribution(freq_dict, total_frequency)
model = None
try:
model = load_model("model.pkl")
if model is None:
model = CBOW(contexts, targets, vocab_size, frequency_weights=frequency_weights, embedding_size=300, learning_rate=0.5, do_negative_sampling=True, sampling_distribution=unigram_distribution)
model.train(50)
else:
model.train(50)
except FileNotFoundError:
model = CBOW(contexts, targets, vocab_size, frequency_weights=frequency_weights, embedding_size=300, learning_rate=0.5, do_negative_sampling=True, sampling_distribution=unigram_distribution)
model.train(50)
save_model(model, "model.pkl")
test(model, corpus, word_to_idx, idx_to_word)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment