Last active
March 24, 2018 09:59
-
-
Save shivamsaboo17/7528c01eae3c027d1f4b51a87c0d5b03 to your computer and use it in GitHub Desktop.
RNN implemented from scratch just using numpy. Ready to use framework by just plugging in data!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pickle | |
""" | |
To use this RNN model do the following: | |
from rnn import RNN | |
model = RNN(word_dim, hidden_dim, truncte_back_prop_steps) | |
model.train(model, x_train, y_train, learning_rate, epochs, calculate_loss_after) | |
model.predict(x) | |
model.save_model(file_name) | |
""" | |
class RNN: | |
def __init__(self, word_dim, hidden_dim, back_truncate=4): | |
# Initialization of parameters | |
self.word_dim = word_dim | |
self.hidden_dim = hidden_dim | |
self.back_truncate = back_truncate | |
# The weights are randomly initialized | |
self.U = np.random.uniform(-np.sqrt(1./word_dim), np.sqrt(1./word_dim), (hidden_dim, word_dim)) | |
self.V = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (word_dim, hidden_dim)) | |
self.W = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (hidden_dim, hidden_dim)) | |
@staticmethod | |
def softmax(x): | |
xt = np.exp(x - np.max(x)) | |
return xt / np.sum(xt) | |
def forward_propagate(self, x): | |
# The total length of the data | |
t = len(x) | |
# Let's define s as state vector which stores t+1 states | |
# The dimension of the state vector is t+1 x hidden_dim | |
# Initialize the s[-1] to zeros which is the hidden for 1st input | |
s = np.zeros((t+1, self.hidden_dim)) | |
s[-1] = np.zeros(self.hidden_dim) | |
# Let o i.e the output at each time step be stored as o | |
o = np.zeros((t, self.word_dim)) | |
for i in range(t): | |
# Make forward pass for each time step and store hidden state | |
# and the output in s and o matrix respectively | |
# Here we index U w.r.t value of x which is same as multiplying with a | |
# on hot vector. | |
s[i] = np.tanh(self.U[:, x[i]] + np.dot(self.W, s[i-1])) | |
o[i] = self.softmax(np.dot(self.V, s[i])) | |
return o, s | |
def predict(self, x): | |
o, s = self.forward_propagate(x) | |
return np.argmax(o, axis = 1) | |
def total_loss(self, x, y): | |
l = 0 | |
for i in range(len(y)): | |
# First forward propagate the inputs | |
o, s = self.forward_propagate(x[i]) | |
correct_word_prediction = o[np.arange(len(y[i])), y[i]] | |
l += -1 * np.sum(np.log(correct_word_prediction)) | |
return l | |
def loss(self, x, y): | |
n = np.sum(len(y_i) for y_i in y) | |
return self.total_loss(x, y) / n | |
def back_propagate(self, x, y): | |
t = len(x) | |
o, s = self.forward_propagate(x) | |
# Defining the gradient variables | |
dldu = np.zeros(self.U.shape) | |
dldv = np.zeros(self.V.shape) | |
dldw = np.zeros(self.W.shape) | |
del_o = o | |
# Cross entropy softmax derivative (just difference between labels and output) | |
del_o[np.arange(len(y)), y] -= 1 | |
for i in np.arange(t): | |
dldv += np.outer(del_o[i], s[i].T) | |
# Initial delta calculation for the current time step | |
# This delta will be used to calculate derivatives from | |
# previous time steps until the truncated threshold | |
# using chain rule. | |
# Note that using a too large value for truncating threshold | |
# results in vanishing gradient problem due to many matrix multiplication due | |
# to hardcore chain rule | |
delta_t = self.V.T.dot(del_o[i]) * (1 - (s[i] ** 2)) | |
for back_prop_step in np.arange(max(0, i-self.back_truncate), i+1)[::-1]: | |
dldw += np.outer(delta_t, s[back_prop_step - 1]) | |
dldu[:, x[back_prop_step]] += delta_t | |
delta_t = self.W.T.dot(delta_t) * (1 - s[back_prop_step - 1] ** 2) | |
return dldu, dldv, dldw | |
def update_params(self, x, y, learning_rate): | |
dldu, dldv, dldw = self.back_propagate(x, y) | |
self.U -= learning_rate * dldu | |
self.V -= learning_rate * dldv | |
self.W -= learning_rate * dldw | |
@staticmethod | |
def train(model, x_train, y_train, learning_rate=0.005, n_epochs=100, evaluate_loss_after=1): | |
losses = [] | |
num_examples_seen = 0 | |
for epoch in range(n_epochs): | |
if epoch % evaluate_loss_after == 0: | |
loss = model.loss(x_train, y_train) | |
losses.append((num_examples_seen, loss)) | |
print(losses[-1]) | |
for i in range(len(y_train)): | |
model.update_params(x_train[i], y_train[i], learning_rate) | |
num_examples_seen += 1 | |
def save_model(self, file_name): | |
with open(file_name, 'wb') as f: | |
pickle.dump(self, f, pickle.HIGHEST_PROTOCOL) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment