Skip to content

Instantly share code, notes, and snippets.

@adriansarno
Last active March 9, 2017 15:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save adriansarno/bd9ee25142c0e17a3b9c42695920065e to your computer and use it in GitHub Desktop.
Save adriansarno/bd9ee25142c0e17a3b9c42695920065e to your computer and use it in GitHub Desktop.
Char RNN in python
"""
Character RNN in python
Code adaptation from Andrej Karpathy's Vanilla RNN blog post.
Prepared by Adrian Sarno (mr.sarno2@gmail.com)
"""
import numpy as np
from IPython.display import Latex
from random import uniform
def one_hot(c, num_of_classes):
# encode a char representation in an ndarray
vec = np.zeros((num_of_classes,1))
vec[c, 0] = 1
return vec
class RNN:
# define an RNN layer
def __init__(self, N_classes, N_hidden, N_input, learning_rate):
self.W_xh = np.random.normal(0.0, 1e-2, size=(N_hidden, N_classes)) # input to hidden-state weights
self.W_hh = np.random.normal(0.0, 1e-2, size=(N_hidden, N_hidden)) # hidden to hidden weights
self.W_hy = np.random.normal(0.0, 1e-2, size=(N_classes, N_hidden)) # hidden to output weights
# (there is no input bias term)
self.b_h = np.zeros((N_hidden, 1)) # hidden bias
self.b_y = np.zeros((N_classes, 1)) # output bias
self.N_hidden = N_hidden
self.N_classes = N_classes
self.N_input = N_input
self.learning_rate = learning_rate
def forward_step(self, x, h):
"""
params:
x: 1-of-k encoding of an input char
h: hidden state (from previous char)
"""
# compute affine transforms
xh = np.dot(self.W_xh, x)
hh = np.dot(self.W_hh, h) + self.b_h
# linearly combine input and state scores
scores = hh + xh
# compute the new hidden state
# non-linear squashing of state+input scores into [-1:1]
h = np.tanh(scores)
# compute the output vector
# unnormalized log probabilities for next char
# (class scores)
y = np.dot(self.W_hy, h) + self.b_y
return h, y
def backward_pass(self, inputs, targets, xs, hs, ps):
"""
backward pass: compute gradients going backwards
"""
# init the gradients with zero
dL_dWxh, dL_dWhh, dL_dWhy = np.zeros_like(self.W_xh), np.zeros_like(self.W_hh), np.zeros_like(self.W_hy)
dL_dbh, dL_dby = np.zeros_like(self.b_h), np.zeros_like(self.b_y)
# diff of final state is zero, is this the same as saying that we are in a minimum?
dL_dhnext = np.zeros_like(hs[0])
for t in reversed(xrange(len(inputs))):
# diff of the loss respect the scores (unnormalized class probabilities):
# dL_dy = (y_hat - y)
# since y_hat is a distribution of probs over the vocabulary and y is a one-hot encoded
# vector of all zeros and just a 1 in the place of the target char, we can deduce that
# (y_hat- y) is equal to y_hat after substracting 1 from the prob of the target class k
k = targets[t]
dL_dy = np.copy(ps[t])
dL_dy[k] -= 1
# backprop the output weights
dL_dWhy += np.dot(dL_dy, hs[t].T)
dL_dby += dL_dy
dL_dh = np.dot(self.W_hy.T, dL_dy) + dL_dhnext # backprop into h
dL_dhraw = (1 - hs[t] * hs[t]) * dL_dh # backprop through tanh nonlinearity
# backprop the state weights
dL_dbh += dL_dhraw
dL_dWhh += np.dot(dL_dhraw, hs[t-1].T)
# backprop across the sequence of state-to-next-state transforms
dL_dhnext = np.dot(self.W_hh.T, dL_dhraw)
# backprop the input weights
dL_dWxh += np.dot(dL_dhraw, xs[t].T)
# clip to mitigate exploding gradients
for dparam in [dL_dWxh, dL_dWhh, dL_dWhy, dL_dbh, dL_dby]:
np.clip(dparam, -5, 5, out=dparam)
return dL_dWxh, dL_dWhh, dL_dWhy, dL_dbh, dL_dby
def train(self, inputs, targets, h):
"""
returns the loss, gradients on model parameters, and last hidden state
params:
inputs: input sequence (list of chars.)
targets: target sequence (list of chars.)
h: the initial hidden state, [N_h x 1] array
"""
xs, hs, ys, ps = {}, {}, {}, {}
# stuff the initial state in the dictionary of states, with key = -1
hs[-1] = np.copy(h)
# forward pass
for t, c in enumerate(inputs):
xs[t] = one_hot(c, self.N_classes)
hs[t], ys[t] = self.forward_step(xs[t], hs[t-1])
ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t])) # softmax
# compute cross-entropy loss
loss = 0
for t in range(len(inputs)):
c = targets[t]
probs = ps[t]
loss += -np.log(probs[c,0])
# backward pass: compute gradients going backwards
dWxh, dWhh, dWhy, dbh, dby = self.backward_pass(inputs, targets, xs, hs, ps)
return loss, dWxh, dWhh, dWhy, dbh, dby, hs[len(inputs)-1]
def prep_sequences(self, data, p, h):
"""
prepare inputs
class vars:
- N_input: how much do you unroll the RNN (the size of the input sequence looked at each step)
"""
if p+self.N_input+1 >= len(data):
# reset position pointer in data and also clear the RNN internal state
h = np.zeros((self.N_hidden, 1))
p = 0
inputs = [char_to_ix[ch] for ch in data[p : p+self.N_input]]
targets = [char_to_ix[ch] for ch in data[p+1 : p+self.N_input+1]]
return inputs, targets, p, h
def process(self, data, num_steps):
"""
num_steps: number of steps
"""
mWxh, mWhh, mWhy = np.zeros_like(self.W_xh), np.zeros_like(self.W_hh), np.zeros_like(self.W_hy)
mbh, mby = np.zeros_like(self.b_h), np.zeros_like(self.b_y) # memory variables for Adagrad
smooth_loss = -np.log(1.0/self.N_classes)*self.N_input # loss at iteration 0
p = 0
h = np.zeros((self.N_hidden, 1))
for curr_step in range(num_steps):
# prepare inputs (we're sweeping from left to right in steps N_input long)
inputs, targets, p, h = self.prep_sequences(data, p, h)
# sample from the model now and then
if (curr_step+1) % (num_steps/5) == 0:
sample_ix = self.predict(h, inputs[0], 200)
txt = ''.join(ix_to_char[ix] for ix in sample_ix)
print '---- after %d steps:\n %s \n----' % (curr_step+1, txt, )
# forward N_input characters through the net and fetch gradient
loss, dL_dWxh, dL_dWhh, dL_dWhy, dL_dbh, dL_dby, h = self.train(inputs, targets, h)
# perform parameter update with Adagrad
for param, dparam, mem in zip([self.W_xh, self.W_hh, self.W_hy, self.b_h, self.b_y],
[dL_dWxh, dL_dWhh, dL_dWhy, dL_dbh, dL_dby],
[mWxh, mWhh, mWhy, mbh, mby]):
mem += dparam * dparam
param += -self.learning_rate * dparam / np.sqrt(mem + 1e-8) # adagrad update
p += self.N_input # move data pointer
def predict(self, h, seed_ix, n):
"""
sample a sequence of characters from the model
params:
h is the initial internal state (seed state), we can use the training state
seed_ix is the initial input character (seed character)
n is the number of sampled characters to output
"""
x = np.zeros((self.N_classes, 1))
x[seed_ix] = 1
ixes = []
for t in xrange(n):
h = np.tanh(np.dot(self.W_xh, x) + np.dot(self.W_hh, h) + self.b_h)
y = np.dot(self.W_hy, h) + self.b_y
p = np.exp(y) / np.sum(np.exp(y))
ix = np.random.choice(range(self.N_classes), p=p.ravel()) # ravel returns a contiguous flattened array.
x = np.zeros((self.N_classes, 1))
x[ix] = 1
ixes.append(ix)
return ixes
# read text
from keras.utils.data_utils import get_file
path = get_file('nietzsche.txt', origin="https://s3.amazonaws.com/text-datasets/nietzsche.txt")
data = open(path).read().lower()
print('corpus length:', len(data))
chars = set(data)
data_size, vocab_size = len(data), len(chars)
print 'data has %d characters, %d unique.' % (data_size, vocab_size)
char_to_ix = { ch:i for i,ch in enumerate(chars) }
ix_to_char = { i:ch for i,ch in enumerate(chars) }
# hyperparameters
N_hidden = 128 # size of hidden layer of neurons
seq_length = 30 # number of steps to unroll the RNN for
learning_rate = 1e-2
# run
rnn1 = RNN(vocab_size, N_hidden, seq_length, learning_rate)
rnn1.process(data, 100000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment