Skip to content

Instantly share code, notes, and snippets.

@tencia
Last active June 28, 2016 11:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tencia/e2e8686228b670bf9002 to your computer and use it in GitHub Desktop.
Save tencia/e2e8686228b670bf9002 to your computer and use it in GitHub Desktop.
import lasagne as nn
import numpy as np
import theano
import theano.tensor as T
import math
from lasagne.utils import unroll_scan
from lasagne.layers import LSTMLayer
from lasagne.layers import Gate
from lasagne.layers import Layer
# 01/04/2016
# demo of a way to sample on-line from an LSTM in Lasagne by treating hidden/cell states
# as theano variables (instead of reconstructing the sequence each time)
# extension of lasagne.layers.LSTMLayer which re-implements get_output to return both cell
# and hidden state instead of just hidden state
class LSTMSampleableLayer(LSTMLayer):
def __init__(self, *args, **kwargs):
super(LSTMSampleableLayer, self).__init__(*args, **kwargs)
def get_output_shape_for(self, input_shapes):
ret = super(LSTMSampleableLayer, self).get_output_shape_for(input_shapes)
return ret[:-1] + (ret[-1:][0]*2,) if len(ret) > 1 else ret
def get_output_for(self, inputs, **kwargs):
"""
Have to re-write LSTMLayer's output construction because we need
cell_out, which is not stored in the original
This is exactly the same except that we return cell_out and hid_out
concatenated together, instead of just hid_out
"""
# Retrieve the layer input
input = inputs[0]
# Retrieve the mask when it is supplied
mask = None
hid_init = None
cell_init = None
if self.mask_incoming_index > 0:
mask = inputs[self.mask_incoming_index]
if self.hid_init_incoming_index > 0:
hid_init = inputs[self.hid_init_incoming_index]
if self.cell_init_incoming_index > 0:
cell_init = inputs[self.cell_init_incoming_index]
# Treat all dimensions after the second as flattened feature dimensions
if input.ndim > 3:
input = T.flatten(input, 3)
# Because scan iterates over the first dimension we dimshuffle to
# (n_time_steps, n_batch, n_features)
input = input.dimshuffle(1, 0, 2)
seq_len, num_batch, _ = input.shape
# Stack input weight matrices into a (num_inputs, 4*num_units)
# matrix, which speeds up computation
W_in_stacked = T.concatenate(
[self.W_in_to_ingate, self.W_in_to_forgetgate,
self.W_in_to_cell, self.W_in_to_outgate], axis=1)
# Same for hidden weight matrices
W_hid_stacked = T.concatenate(
[self.W_hid_to_ingate, self.W_hid_to_forgetgate,
self.W_hid_to_cell, self.W_hid_to_outgate], axis=1)
# Stack biases into a (4*num_units) vector
b_stacked = T.concatenate(
[self.b_ingate, self.b_forgetgate,
self.b_cell, self.b_outgate], axis=0)
if self.precompute_input:
# Because the input is given for all time steps, we can
# precompute_input the inputs dot weight matrices before scanning.
# W_in_stacked is (n_features, 4*num_units). input is then
# (n_time_steps, n_batch, 4*num_units).
input = T.dot(input, W_in_stacked) + b_stacked
# At each call to scan, input_n will be (n_time_steps, 4*num_units).
# We define a slicing function that extract the input to each LSTM gate
def slice_w(x, n):
return x[:, n*self.num_units:(n+1)*self.num_units]
# Create single recurrent computation step function
# input_n is the n'th vector of the input
def step(input_n, cell_previous, hid_previous, *args):
if not self.precompute_input:
input_n = T.dot(input_n, W_in_stacked) + b_stacked
# Calculate gates pre-activations and slice
gates = input_n + T.dot(hid_previous, W_hid_stacked)
# Clip gradients
if self.grad_clipping:
gates = theano.gradient.grad_clip(
gates, -self.grad_clipping, self.grad_clipping)
# Extract the pre-activation gate values
ingate = slice_w(gates, 0)
forgetgate = slice_w(gates, 1)
cell_input = slice_w(gates, 2)
outgate = slice_w(gates, 3)
if self.peepholes:
# Compute peephole connections
ingate += cell_previous*self.W_cell_to_ingate
forgetgate += cell_previous*self.W_cell_to_forgetgate
# Apply nonlinearities
ingate = self.nonlinearity_ingate(ingate)
forgetgate = self.nonlinearity_forgetgate(forgetgate)
cell_input = self.nonlinearity_cell(cell_input)
# Compute new cell value
cell = forgetgate*cell_previous + ingate*cell_input
if self.peepholes:
outgate += cell*self.W_cell_to_outgate
outgate = self.nonlinearity_outgate(outgate)
# Compute new hidden unit activation
hid = outgate*self.nonlinearity(cell)
return [cell, hid]
def step_masked(input_n, mask_n, cell_previous, hid_previous, *args):
cell, hid = step(input_n, cell_previous, hid_previous, *args)
# Skip over any input with mask 0 by copying the previous
# hidden state; proceed normally for any input with mask 1.
not_mask = 1 - mask_n
cell = cell*mask_n + cell_previous*not_mask
hid = hid*mask_n + hid_previous*not_mask
return [cell, hid]
if mask is not None:
# mask is given as (batch_size, seq_len). Because scan iterates
# over first dimension, we dimshuffle to (seq_len, batch_size) and
# add a broadcastable dimension
mask = mask.dimshuffle(1, 0, 'x')
sequences = [input, mask]
step_fun = step_masked
else:
sequences = input
step_fun = step
ones = T.ones((num_batch, 1))
if isinstance(self.cell_init, Layer):
pass
elif isinstance(self.cell_init, T.TensorVariable):
cell_init = self.cell_init
else:
# Dot against a 1s vector to repeat to shape (num_batch, num_units)
cell_init = T.dot(ones, self.cell_init)
if isinstance(self.hid_init, Layer):
pass
elif isinstance(self.hid_init, T.TensorVariable):
hid_init = self.hid_init
else:
# Dot against a 1s vector to repeat to shape (num_batch, num_units)
hid_init = T.dot(ones, self.hid_init)
# The hidden-to-hidden weight matrix is always used in step
non_seqs = [W_hid_stacked]
# The "peephole" weight matrices are only used when self.peepholes=True
if self.peepholes:
non_seqs += [self.W_cell_to_ingate,
self.W_cell_to_forgetgate,
self.W_cell_to_outgate]
# When we aren't precomputing the input outside of scan, we need to
# provide the input weights and biases to the step function
if not self.precompute_input:
non_seqs += [W_in_stacked, b_stacked]
if self.unroll_scan:
# Retrieve the dimensionality of the incoming layer
input_shape = self.input_shapes[0]
# Explicitly unroll the recurrence instead of using scan
cell_out, hid_out = unroll_scan(
fn=step_fun,
sequences=sequences,
outputs_info=[cell_init, hid_init],
go_backwards=self.backwards,
non_sequences=non_seqs,
n_steps=input_shape[1])
else:
# Scan op iterates over first dimension of input and repeatedly
# applies the step function
cell_out, hid_out = theano.scan(
fn=step_fun,
sequences=sequences,
outputs_info=[cell_init, hid_init],
go_backwards=self.backwards,
truncate_gradient=self.gradient_steps,
non_sequences=non_seqs,
strict=True)[0]
# When it is requested that we only return the final sequence step,
# we need to slice it out immediately after scan is applied
if self.only_return_final:
hid_out = hid_out[-1]
cell_out = cell_out[-1]
else:
# dimshuffle back to (n_batch, n_time_steps, n_features))
hid_out = hid_out.dimshuffle(1, 0, 2)
cell_out = cell_out.dimshuffle(1, 0, 2)
# if scan is backward reverse the output
if self.backwards:
hid_out = hid_out[:, ::-1]
cell_out = cell_out[:, ::-1]
return T.concatenate([cell_out, hid_out], axis=2)
nhid = 5
batch_size = 1
zdim=2
cellvar = T.matrix('cell_state')
hidvar = T.matrix('hid_state')
inputvar = T.tensor3('input')
# create simple one-layer lstm
l_in = nn.layers.InputLayer(input_var=inputvar, shape=(None, None, zdim))
lstm = LSTMSampleableLayer(l_in, nhid, cell_init = cellvar, hid_init = hidvar)
l_c = nn.layers.SliceLayer(lstm, axis=2, indices=slice(None,nhid))
l_h = nn.layers.SliceLayer(lstm, axis=2, indices=slice(nhid,None))
# compile function to run it forward one step
fwd = theano.function([inputvar, cellvar, hidvar],
nn.layers.get_output([l_c, l_h], deterministic=True))
# pull out params and calculate same outputs in numpy
pd = dict((str(p), p.get_value()) for p in nn.layers.get_all_params(lstm))
sigm = np.vectorize(lambda y : 1/(1+math.exp(-y)))
tanh = np.vectorize(lambda y: math.tanh(y))
def slice_w(x,n,n_units):
return x[:, n*n_units:(n+1)*n_units]
W_in_stacked = np.concatenate([pd['W_in_to_ingate'],
pd['W_in_to_forgetgate'],
pd['W_in_to_cell'],
pd['W_in_to_outgate']], axis=1)
W_hid_stacked = np.concatenate([pd['W_hid_to_ingate'],
pd['W_hid_to_forgetgate'],
pd['W_hid_to_cell'],
pd['W_hid_to_outgate']], axis=1)
b_stacked = np.concatenate([pd['b_ingate'],
pd['b_forgetgate'],
pd['b_cell'],
pd['b_outgate']], axis=0)
# function to step lstm forward one step using only numpy
def np_fwd(x, c, h):
input_n = np.dot(x[0], W_in_stacked) + b_stacked
gates = input_n + np.dot(h, W_hid_stacked)
ingate = slice_w(gates, 0, nhid)
forgetgate = slice_w(gates, 1, nhid)
cell_input = slice_w(gates, 2, nhid)
outgate = slice_w(gates, 3, nhid)
ingate += c*pd['W_cell_to_ingate']
forgetgate += c*pd['W_cell_to_forgetgate']
ingate=sigm(ingate)
forgetgate = sigm(forgetgate)
cell_input = tanh(cell_input)
ct = forgetgate*c + ingate*cell_input
outgate += ct*pd['W_cell_to_outgate']
outgate = sigm(outgate)
ht = outgate*tanh(ct)
return [ct], [ht]
# random initial state
c_np = c_net = [np.random.rand(1,5).astype(theano.config.floatX)]
h_np = h_net = [np.random.rand(1,5).astype(theano.config.floatX)]
#run forward 3 steps using random input vectors
for step_num in xrange(3):
x=np.random.rand(1,1,2).astype(theano.config.floatX)
c_net, h_net = fwd(x, c_net[-1], h_net[-1])
c_np, h_np = np_fwd(x, c_np[-1], h_np[-1])
print 'step {}, cell, numpy : '.format(step_num), c_np[-1]
print 'step {}, cell, theano: '.format(step_num), c_net[-1]
print 'step {}, hid, numpy : '.format(step_num), h_np[-1]
print 'step {}, hid, theano : '.format(step_num), h_net[-1]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment