Skip to content

Instantly share code, notes, and snippets.

@skaae
Last active August 29, 2015 14:13
Show Gist options
  • Save skaae/98d751b9c2e17c967ce1 to your computer and use it in GitHub Desktop.
Save skaae/98d751b9c2e17c967ce1 to your computer and use it in GitHub Desktop.
class BidirectionalLSTMLayer(Layer):
'''
A long short-term memory (LSTM) layer. Includes "peephole connections" and
forget gate. Based on the definition in [#graves2014generating]_, which is
the current common definition. Gate names are taken from [#zaremba2014],
figure 1.
:references:
.. [#graves2014generating] Alex Graves, "Generating Sequences With
Recurrent Neural Networks".
.. [#zareba2014] Zaremba, W. et.al Recurrent neural network
regularization. (http://arxiv.org/abs/1409.2329)
'''
def __init__(self, input_layer, num_units,
W_in_to_gates=init.Normal(0.1),
W_hid_to_gates=init.Normal(0.1),
W_cell_to_gates=init.Normal(0.1),
b_gates=init.Normal(0.1),
nonlinearity_ingate=nonlinearities.sigmoid,
nonlinearity_forgetgate=nonlinearities.sigmoid,
nonlinearity_modulationgate=nonlinearities.tanh,
nonlinearity_outgate=nonlinearities.sigmoid,
nonlinearity_out=nonlinearities.tanh,
cell_init=init.Constant(0.),
hid_init=init.Constant(0.),
learn_init=False,
peepholes=True):
'''
Initialize an LSTM layer. For details on what the parameters mean, see
(7-11) from [#graves2014generating]_.
:parameters:
- input_layer : layers.Layer
Input to this recurrent layer
- num_units : int
Number of hidden units
- W_in_to_ingate : function or np.ndarray or theano.shared
:math:`W_{xi}`
- W_hid_to_ingate : function or np.ndarray or theano.shared
:math:`W_{hi}`
- W_cell_to_ingate : function or np.ndarray or theano.shared
:math:`W_{ci}`
- b_ingate : function or np.ndarray or theano.shared
:math:`b_i`
- nonlinearity_ingate : function
:math:`\sigma`
- W_in_to_forgetgate : function or np.ndarray or theano.shared
:math:`W_{xf}`
- W_hid_to_forgetgate : function or np.ndarray or theano.shared
:math:`W_{hf}`
- W_cell_to_forgetgate : function or np.ndarray or theano.shared
:math:`W_{cf}`
- b_forgetgate : function or np.ndarray or theano.shared
:math:`b_f`
- nonlinearity_forgetgate : function
:math:`\sigma`
- W_in_to_modulationgate : function or np.ndarray or theano.shared
:math:`W_{ic}`
- W_hid_to_modulationgate : function or np.ndarray or theano.shared
:math:`W_{hc}`
- b_modulationgate : function or np.ndarray or theano.shared
:math:`b_c`
- nonlinearity_modulationgate : function or np.ndarray or
theano.shared
:math:`\tanh`
- W_in_to_outgate : function or np.ndarray or theano.shared
:math:`W_{io}`
- W_hid_to_outgate : function or np.ndarray or theano.shared
:math:`W_{ho}`
- W_cell_to_outgate : function or np.ndarray or theano.shared
:math:`W_{co}`
- b_outgate : function or np.ndarray or theano.shared
:math:`b_o`
- nonlinearity_outgate : function
:math:`\sigma`
- nonlinearity_out : function or np.ndarray or theano.shared
:math:`\tanh`
- cell_init : function or np.ndarray or theano.shared
:math:`c_0`
- hid_init : function or np.ndarray or theano.shared
:math:`h_0`
- backwards : boolean
If True, process the sequence backwards
- learn_init : boolean
If True, initial hidden values are learned
- peepholes : boolean
If True, the LSTM uses peephole connections.
When False, W_cell_to_ingate, W_cell_to_forgetgate and
W_cell_to_outgate are ignored.
'''
# Initialize parent layer
super(BidirectionalLSTMLayer, self).__init__(input_layer)
# For any of the nonlinearities, if None is supplied, use identity
if nonlinearity_ingate is None:
self.nonlinearity_ingate = nonlinearities.identity
else:
self.nonlinearity_ingate = nonlinearity_ingate
if nonlinearity_forgetgate is None:
self.nonlinearity_forgetgate = nonlinearities.identity
else:
self.nonlinearity_forgetgate = nonlinearity_forgetgate
if nonlinearity_modulationgate is None:
self.nonlinearity_modulationgate = nonlinearities.identity
else:
self.nonlinearity_modulationgate = nonlinearity_modulationgate
if nonlinearity_outgate is None:
self.nonlinearity_outgate = nonlinearities.identity
else:
self.nonlinearity_outgate = nonlinearity_outgate
if nonlinearity_out is None:
self.nonlinearity_out = nonlinearities.identity
else:
self.nonlinearity_out = nonlinearity_out
self.learn_init = learn_init
self.num_units = num_units
self.peepholes = peepholes
# Input dimensionality is the output dimensionality of the input layer
(num_batch, _, num_inputs) = self.input_layer.get_output_shape()
# FORWARD WEIGHTS
if self.peepholes:
self.W_cell_to_gates_fwd = self.create_param(W_cell_to_gates, (3*num_units))
self.b_gates_fwd = self.create_param(b_gates, (4*num_units))
self.W_hid_to_gates_fwd = self.create_param(W_hid_to_gates, (num_units, 4*num_units))
self.W_in_to_gates_fwd = self.create_param(W_in_to_gates, (num_inputs,4*num_units))
# stack input to gate weights into a (num_inputs, 4*num_units) tensor
# Setup initial values for the cell and the lstm hidden units
self.cell_init_fwd = self.create_param(cell_init, (num_batch, num_units))
self.hid_init_fwd = self.create_param(hid_init, (num_batch, num_units))
## BACKWARD WEIGHTS
if self.peepholes:
self.W_cell_to_gates_bck = self.create_param(W_cell_to_gates, (3*num_units))
self.b_gates_bck = self.create_param(b_gates, (4*num_units))
self.W_hid_to_gates_bck = self.create_param(W_hid_to_gates, (num_units, 4*num_units))
self.W_in_to_gates_bck = self.create_param(W_in_to_gates, (num_inputs,4*num_units))
# stack input to gate weights into a (num_inputs, 4*num_units) tensor
# Setup initial values for the cell and the lstm hidden units
self.cell_init_bck = self.create_param(cell_init, (num_batch, num_units))
self.hid_init_bck = self.create_param(hid_init, (num_batch, num_units))
self.W_cell_to_gates_fwd.name = "W_cell_to_gates_fwd"
self.b_gates_fwd.name = "b_gates_fwd"
self.W_hid_to_gates_fwd.name = "W_hid_to_gates_fwd"
self.W_in_to_gates_fwd.name = "W_in_to_gates_fwd"
self.cell_init_fwd.name = "cell_init_fwd"
self.hid_init_fwd.name = "hid_init_fwd"
self.W_cell_to_gates_bck.name = "W_cell_to_gates_bck"
self.b_gates_bck.name = "b_gates_bck"
self.W_hid_to_gates_bck.name = "W_hid_to_gates_bck"
self.W_in_to_gates_bck.name = "W_in_to_gates_bck"
self.cell_init_bck.name = "cell_init_bck"
self.hid_init_bck.name = "hid_init_bck"
def get_params(self):
'''
Get all parameters of this layer.
:returns:
- params : list of theano.shared
List of all parameters
'''
params = self.get_weight_params() + self.get_bias_params()
if self.peepholes:
params.extend(self.get_peephole_params())
if self.learn_init:
params.extend(self.get_init_params())
return params
def get_weight_params(self):
'''
Get all weights of this layer
:returns:
- weight_params : list of theano.shared
List of all weight parameters
'''
return [self.W_in_to_gates_fwd, self.W_hid_to_gates_fwd,
self.W_in_to_gates_bck, self.W_hid_to_gates_bck]
def get_peephole_params(self):
'''
Get all peephole parameters of this layer.
:returns:
- init_params : list of theano.shared
List of all peephole parameters
'''
return [self.W_cell_to_gates_fwd, self.W_cell_to_gates_bck]
def get_init_params(self):
'''
Get all initital parameters of this layer.
:returns:
- init_params : list of theano.shared
List of all initial parameters
'''
return [self.hid_init_fwd, self.cell_init_fwd,
self.hid_init_bck, self.cell_init_bck]
def get_bias_params(self):
'''
Get all bias parameters of this layer.
:returns:
- bias_params : list of theano.shared
List of all bias parameters
'''
return [self.b_gates_fwd, self.b_gates_bck]
def get_output_shape_for(self, input_shape):
'''
Compute the expected output shape given the input.
:parameters:
- input_shape : tuple
Dimensionality of expected input
:returns:
- output_shape : tuple
Dimensionality of expected outputs given input_shape
'''
return (input_shape[0], input_shape[1], 2*self.num_units)
def get_output_for(self, input_fwd, mask=None, *args, **kwargs):
'''
Compute this layer's output function given a symbolic input variable
:parameters:
- input : theano.TensorType
Symbolic input variable
- mask : theano.TensorType
Theano variable denoting whether each time step in each
sequence in the batch is part of the sequence or not. This is
needed when scanning backwards. If all sequences are of the
same length, it should be all 1s.
:returns:
- layer_output : theano.TensorType
Symbolic output variable
'''
# Treat all layers after the first as flattened feature dimensions
assert mask is not None
if input_fwd.ndim > 3:
input = input_Fwd.reshape((input_fwd.shape[0], input_fwd.shape[1],
T.prod(input_fwd.shape[2:])))
# precompute inputs*W and dimshuffle
# Input is provided as (n_batch, n_time_steps, n_features)
# W _in_to_gates is (n_features, 4*num_units). input dot W is then
# (n_batch, n_time_steps, 4*num_units). Because scan iterate over the
# first dimension we dimshuffle to (n_time_steps, n_batch, n_features)
# flip input and mask if we ar going backwards
input_bck = input_fwd[:, ::-1, :]
mask_bck = mask[:, ::-1]
input_dot_W_fwd = T.dot(input_fwd, self.W_in_to_gates_fwd).dimshuffle(1, 0, 2)
input_dot_W_bck = T.dot(input_bck, self.W_in_to_gates_bck).dimshuffle(1, 0, 2)
input_dot_W_fwd += self.b_gates_fwd
input_dot_W_bck += self.b_gates_bck
# mask is given as (batch_size, seq_len). Because scan iterates over
# first dim. we dimshuffle to (seq_len, batch_size) and add a
# broadcastable dimension
#mask_fwd = mask_fwd.dimshuffle(1, 0, 'x')
mask_bck = mask_bck.dimshuffle(1, 0, 'x')
# input_dow_w is (n_batch, n_time_steps, 4*num_units). We define a
# slicing function that extract the input to each LSTM gate
# slice_c is similar but for peephole weights.
def slice_w(x, n):
return x[:, n*self.num_units:(n+1)*self.num_units]
def slice_c(x, n):
return x[n*self.num_units:(n+1)*self.num_units]
# Create single recurrent computation step function
# input_dot_W_n is the n'th row of the input dot W multiplication
# The step function calculates the following:
#
# i_t = \sigma(W_{xi}x_t + W_{hi}h_{t-1} + W_{ci}c_{t-1} + b_i)
# f_t = \sigma(W_{xf}x_t + W_{hf}h_{t-1} + W_{cf}c_{t-1} + b_f)
# c_t = f_tc_{t - 1} + i_t\tanh(W_{xc}x_t + W_{hc}h_{t-1} + b_c)
# o_t = \sigma(W_{xo}x_t + W_{ho}h_{t-1} + W_{co}c_t + b_o)
# h_t = o_t \tanh(c_t)
#
# Gate names are taken from http://arxiv.org/abs/1409.2329 figure 1
def dostep(input_dot_W_n, cell_previous, hid_previous,
W_hid_to_gates, W_cell_to_gates):
# calculate gates pre-activations and slice
gates = input_dot_W_n + T.dot(hid_previous, W_hid_to_gates)
ingate = slice_w(gates,0)
forgetgate = slice_w(gates,1)
modulationgate = slice_w(gates,2)
outgate = slice_w(gates,3)
if self.peepholes:
ingate += cell_previous*slice_c(W_cell_to_gates, 0)
forgetgate = cell_previous*slice_c(W_cell_to_gates, 1)
outgate = cell_previous*slice_c(W_cell_to_gates, 2)
ingate = self.nonlinearity_ingate(ingate)
forgetgate = self.nonlinearity_forgetgate(forgetgate)
modulationgate = self.nonlinearity_modulationgate(modulationgate)
outgate = self.nonlinearity_outgate(outgate)
cell = forgetgate*cell_previous + ingate*modulationgate
hid = outgate*self.nonlinearity_out(cell)
return cell, hid
def step(input_dot_W_fwd_n, input_dot_W_bck_n, mask_bck,
cell_previous_fwd, hid_previous_fwd,
cell_previous_bck, hid_previous_bck):
#forward
cell_fwd, hid_fwd = dostep(
input_dot_W_fwd_n, cell_previous_fwd, hid_previous_fwd,
self.W_hid_to_gates_fwd, self.W_cell_to_gates_fwd)
# backward
cell_bck, hid_bck = dostep(
input_dot_W_bck_n, cell_previous_bck, hid_previous_bck,
self.W_hid_to_gates_bck, self.W_cell_to_gates_bck)
# If mask is 0, use previous state until mask = 1 is found.
# This propagates the layer initial state when moving backwards
# until the end of the sequence is found.
not_mask_bck = 1 - mask_bck
cell_bck = cell_bck*mask_bck + cell_previous_bck*not_mask_bck
hid_bck = hid_fwd*mask_bck + hid_previous_bck*not_mask_bck
return [cell_fwd, hid_fwd, cell_bck, hid_bck]
sequences = [input_dot_W_fwd, input_dot_W_bck, mask_bck]
init = [self.cell_init_fwd, self.hid_init_fwd,
self.cell_init_bck, self.hid_init_bck]
# Scan op iterates over first dimension of input and repeatedly
# applied the step function
scan_out = theano.scan(step, sequences=sequences, outputs_info=init)#[0][1]
# outputis (n_time_steps, n_batch, n_units))
output_fwd = scan_out[0][1]
output_bck = scan_out[0][2] # this should be 3 but it does not compile
# reverse bck output
output_bck = output_bck[::-1, :, :]
# concateante fwd and bck
output = T.concatenate([output_fwd, output_bck], axis=2)
# Now, dimshuffle back to (n_batch, n_time_steps, n_units))
output = output.dimshuffle(1, 0, 2)
return output
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment