Created July 21, 2016 10:11
Lasagne LSTM w. dropout
from lasagne import *
from lasagne.layers import *
from lasagne.random import get_rng
from lasagne.utils import *
import numpy as np
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams
class DropoutLSTMLayer(MergeLayer):
lasagne.layers.recurrent.LSTMLayer(incoming, num_units,
ingate=lasagne.layers.Gate(), forgetgate=lasagne.layers.Gate(),
W_cell=None, nonlinearity=lasagne.nonlinearities.tanh),
hid_init=lasagne.init.Constant(0.), backwards=False, learn_init=False,
peepholes=True, gradient_steps=-1, grad_clipping=0, unroll_scan=False,
precompute_input=True, mask_input=None, only_return_final=False, **kwargs)
A long short-term memory (LSTM) layer.
Includes optional "peephole connections" and a forget gate. Based on the
definition in [1]_, which is the current common definition. The output is
computed by
.. math ::
i_t &= \sigma_i(x_t W_{xi} + h_{t-1} W_{hi}
+ w_{ci} \odot c_{t-1} + b_i)\\
f_t &= \sigma_f(x_t W_{xf} + h_{t-1} W_{hf}
+ w_{cf} \odot c_{t-1} + b_f)\\
c_t &= f_t \odot c_{t - 1}
+ i_t \odot \sigma_c(x_t W_{xc} + h_{t-1} W_{hc} + b_c)\\
o_t &= \sigma_o(x_t W_{xo} + h_{t-1} W_{ho} + w_{co} \odot c_t + b_o)\\
h_t &= o_t \odot \sigma_h(c_t)
incoming : a :class:`lasagne.layers.Layer` instance or a tuple
The layer feeding into this layer, or the expected input shape.
num_units : int
Number of hidden/cell units in the layer.
ingate : Gate
Parameters for the input gate (:math:`i_t`): :math:`W_{xi}`,
:math:`W_{hi}`, :math:`w_{ci}`, :math:`b_i`, and :math:`\sigma_i`.
forgetgate : Gate
Parameters for the forget gate (:math:`f_t`): :math:`W_{xf}`,
:math:`W_{hf}`, :math:`w_{cf}`, :math:`b_f`, and :math:`\sigma_f`.
cell : Gate
Parameters for the cell computation (:math:`c_t`): :math:`W_{xc}`,
:math:`W_{hc}`, :math:`b_c`, and :math:`\sigma_c`.
outgate : Gate
Parameters for the output gate (:math:`o_t`): :math:`W_{xo}`,
:math:`W_{ho}`, :math:`w_{co}`, :math:`b_o`, and :math:`\sigma_o`.
nonlinearity : callable or None
The nonlinearity that is applied to the output (:math:`\sigma_h`). If
None is provided, no nonlinearity will be applied.
cell_init : callable, np.ndarray, theano.shared or :class:`Layer`
Initializer for initial cell state (:math:`c_0`).
hid_init : callable, np.ndarray, theano.shared or :class:`Layer`
Initializer for initial hidden state (:math:`h_0`).
backwards : bool
If True, process the sequence backwards and then reverse the
output again such that the output from the layer is always
from :math:`x_1` to :math:`x_n`.
learn_init : bool
If True, initial hidden values are learned.
peepholes : bool
If True, the LSTM uses peephole connections.
When False, `ingate.W_cell`, `forgetgate.W_cell` and
`outgate.W_cell` are ignored.
gradient_steps : int
Number of timesteps to include in the backpropagated gradient.
If -1, backpropagate through the entire sequence.
grad_clipping : float
If nonzero, the gradient messages are clipped to the given value during
the backward pass. See [1]_ (p. 6) for further explanation.
unroll_scan : bool
If True the recursion is unrolled instead of using scan. For some
graphs this gives a significant speed up but it might also consume
more memory. When `unroll_scan` is True, backpropagation always
includes the full sequence, so `gradient_steps` must be set to -1 and
the input sequence length must be known at compile time (i.e., cannot
be given as None).
precompute_input : bool
If True, precompute input_to_hid before iterating through
the sequence. This can result in a speedup at the expense of
an increase in memory usage.
mask_input : :class:`lasagne.layers.Layer`
Layer which allows for a sequence mask to be input, for when sequences
are of variable length. Default `None`, which means no mask will be
supplied (i.e. all sequences are of the same length).
only_return_final : bool
If True, only return the final sequential output (e.g. for tasks where
a single target value for the entire sequence is desired). In this
case, Theano makes an optimization which saves memory.
.. [1] Graves, Alex: "Generating sequences with recurrent neural networks."
arXiv preprint arXiv:1308.0850 (2013).
def __init__(self, incoming, num_units,
cell=Gate(W_cell=None, nonlinearity=nonlinearities.tanh),
# This layer inherits from a MergeLayer, because it can have four
# inputs - the layer input, the mask, the initial hidden state and the
# inital cell state. We will just provide the layer input as incomings,
# unless a mask input, inital hidden state or initial cell state was
# provided.
incomings = [incoming]
self.mask_incoming_index = -1
self.hid_init_incoming_index = -1
self.cell_init_incoming_index = -1
if mask_input is not None:
self.mask_incoming_index = len(incomings)-1
if isinstance(hid_init, Layer):
self.hid_init_incoming_index = len(incomings)-1
if isinstance(cell_init, Layer):
self.cell_init_incoming_index = len(incomings)-1
# Initialize parent layer
super(DropoutLSTMLayer, self).__init__(incomings, **kwargs)
# If the provided nonlinearity is None, make it linear
if nonlinearity is None:
self.nonlinearity = nonlinearities.identity
self.nonlinearity = nonlinearity
self.learn_init = learn_init
self.num_units = num_units
self.backwards = backwards
self.peepholes = peepholes
self.gradient_steps = gradient_steps
self.grad_clipping = grad_clipping
self.unroll_scan = unroll_scan
self.precompute_input = precompute_input
self.only_return_final = only_return_final
self._srng = RandomStreams(get_rng().randint(1, 2147462579))
if 0. <= dropout < 1.:
self.dropout = dropout
raise ValueError("dropout must be between 0 and 1.")
if unroll_scan and gradient_steps != -1:
raise ValueError(
"Gradient steps must be -1 when unroll_scan is true.")
# Retrieve the dimensionality of the incoming layer
input_shape = self.input_shapes[0]
if unroll_scan and input_shape[1] is None:
raise ValueError("Input sequence length cannot be specified as "
"None when unroll_scan is True")
num_inputs =[2:])
def add_gate_params(gate, gate_name):
""" Convenience function for adding layer parameters from a Gate
instance. """
return (self.add_param(gate.W_in, (num_inputs, num_units),
self.add_param(gate.W_hid, (num_units, num_units),
self.add_param(gate.b, (num_units,),
# Add in parameters from the supplied Gate instances
(self.W_in_to_ingate, self.W_hid_to_ingate, self.b_ingate,
self.nonlinearity_ingate) = add_gate_params(ingate, 'ingate')
(self.W_in_to_forgetgate, self.W_hid_to_forgetgate, self.b_forgetgate,
self.nonlinearity_forgetgate) = add_gate_params(forgetgate,
(self.W_in_to_cell, self.W_hid_to_cell, self.b_cell,
self.nonlinearity_cell) = add_gate_params(cell, 'cell')
(self.W_in_to_outgate, self.W_hid_to_outgate, self.b_outgate,
self.nonlinearity_outgate) = add_gate_params(outgate, 'outgate')
# If peephole (cell to gate) connections were enabled, initialize
# peephole connections. These are elementwise products with the cell
# state, so they are represented as vectors.
if self.peepholes:
self.W_cell_to_ingate = self.add_param(
ingate.W_cell, (num_units, ), name="W_cell_to_ingate")
self.W_cell_to_forgetgate = self.add_param(
forgetgate.W_cell, (num_units, ), name="W_cell_to_forgetgate")
self.W_cell_to_outgate = self.add_param(
outgate.W_cell, (num_units, ), name="W_cell_to_outgate")
# Setup initial values for the cell and the hidden units
if isinstance(cell_init, Layer):
self.cell_init = cell_init
self.cell_init = self.add_param(
cell_init, (1, num_units), name="cell_init",
trainable=learn_init, regularizable=False)
if isinstance(hid_init, Layer):
self.hid_init = hid_init
self.hid_init = self.add_param(
hid_init, (1, self.num_units), name="hid_init",
trainable=learn_init, regularizable=False)
def get_output_shape_for(self, input_shapes):
# The shape of the input to this layer will be the first element
# of input_shapes, whether or not a mask input is being used.
input_shape = input_shapes[0]
# When only_return_final is true, the second (sequence step) dimension
# will be flattened
if self.only_return_final:
return input_shape[0], self.num_units
# Otherwise, the shape will be (n_batch, n_steps, num_units)
return input_shape[0], input_shape[1], self.num_units
def get_output_for(self, inputs, deterministic=False, **kwargs):
Compute this layer's output function given a symbolic input variable
inputs : list of theano.TensorType
`inputs[0]` should always be the symbolic input variable. When
this layer has a mask input (i.e. was instantiated with
`mask_input != None`, indicating that the lengths of sequences in
each batch vary), `inputs` should have length 2, where `inputs[1]`
is the `mask`. The `mask` should be supplied as a Theano variable
denoting whether each time step in each sequence in the batch is
part of the sequence or not. `mask` should be a matrix of shape
``(n_batch, n_time_steps)`` where ``mask[i, j] = 1`` when ``j <=
(length of sequence i)`` and ``mask[i, j] = 0`` when ``j > (length
of sequence i)``. When the hidden state of this layer is to be
pre-filled (i.e. was set to a :class:`Layer` instance) `inputs`
should have length at least 2, and `inputs[-1]` is the hidden state
to prefill with. When the cell state of this layer is to be
pre-filled (i.e. was set to a :class:`Layer` instance) `inputs`
should have length at least 2, and `inputs[-1]` is the hidden state
to prefill with. When both the cell state and the hidden state are
being pre-filled `inputs[-2]` is the hidden state, while
`inputs[-1]` is the cell state.
deterministic : boolean
Whether or not to produce deterministic outputs
layer_output : theano.TensorType
Symbolic output variable.
# Retrieve the layer input
input = inputs[0]
# Retrieve the mask when it is supplied
mask = None
hid_init = None
cell_init = None
if self.mask_incoming_index > 0:
mask = inputs[self.mask_incoming_index]
if self.hid_init_incoming_index > 0:
hid_init = inputs[self.hid_init_incoming_index]
if self.cell_init_incoming_index > 0:
cell_init = inputs[self.cell_init_incoming_index]
# Treat all dimensions after the second as flattened feature dimensions
if input.ndim > 3:
input = T.flatten(input, 3)
# Because scan iterates over the first dimension we dimshuffle to
# (n_time_steps, n_batch, n_features)
input = input.dimshuffle(1, 0, 2)
seq_len, num_batch, _ = input.shape
# Stack input weight matrices into a (num_inputs, 4*num_units)
# matrix, which speeds up computation
W_in_stacked = T.concatenate(
[self.W_in_to_ingate, self.W_in_to_forgetgate,
self.W_in_to_cell, self.W_in_to_outgate], axis=1)
# Same for hidden weight matrices
W_hid_stacked = T.concatenate(
[self.W_hid_to_ingate, self.W_hid_to_forgetgate,
self.W_hid_to_cell, self.W_hid_to_outgate], axis=1)
# Stack biases into a (4*num_units) vector
b_stacked = T.concatenate(
[self.b_ingate, self.b_forgetgate,
self.b_cell, self.b_outgate], axis=0)
if self.dropout > 0 and not deterministic:
retain_prob = T.constant(1) - self.dropout
dropout_mask = self._srng.binomial((num_batch, self.num_units), p=self.dropout, dtype=input.dtype) / retain_prob
dropout_mask = T.ones((input.shape[0], self.num_units), dtype=input.dtype)
if self.precompute_input:
# Because the input is given for all time steps, we can
# precompute_input the inputs dot weight matrices before scanning.
# W_in_stacked is (n_features, 4*num_units). input is then
# (n_time_steps, n_batch, 4*num_units).
input =, W_in_stacked) + b_stacked
# At each call to scan, input_n will be (n_time_steps, 4*num_units).
# We define a slicing function that extract the input to each LSTM gate
def slice_w(x, n):
return x[:, n*self.num_units:(n+1)*self.num_units]
# Create single recurrent computation step function
# input_n is the n'th vector of the input
def step(input_n, cell_previous, hid_previous, *args):
if not self.precompute_input:
input_n =, W_in_stacked) + b_stacked
# Calculate gates pre-activations and slice
gates = input_n + * dropout_mask, W_hid_stacked)
# Clip gradients
if self.grad_clipping:
gates = theano.gradient.grad_clip(
gates, -self.grad_clipping, self.grad_clipping)
# Extract the pre-activation gate values
ingate = slice_w(gates, 0)
forgetgate = slice_w(gates, 1)
cell_input = slice_w(gates, 2)
outgate = slice_w(gates, 3)
if self.peepholes:
# Compute peephole connections
ingate += cell_previous*self.W_cell_to_ingate
forgetgate += cell_previous*self.W_cell_to_forgetgate
# Apply nonlinearities
ingate = self.nonlinearity_ingate(ingate)
forgetgate = self.nonlinearity_forgetgate(forgetgate)
cell_input = self.nonlinearity_cell(cell_input)
# Compute new cell value
cell = forgetgate*cell_previous + ingate*cell_input
if self.peepholes:
outgate += cell*self.W_cell_to_outgate
outgate = self.nonlinearity_outgate(outgate)
# Compute new hidden unit activation
hid = outgate*self.nonlinearity(cell)
return [cell, hid]
def step_masked(input_n, mask_n, cell_previous, hid_previous, *args):
cell, hid = step(input_n, cell_previous, hid_previous, *args)
# Skip over any input with mask 0 by copying the previous
# hidden state; proceed normally for any input with mask 1.
cell = T.switch(mask_n, cell, cell_previous)
hid = T.switch(mask_n, hid, hid_previous)
return [cell, hid]
if mask is not None:
# mask is given as (batch_size, seq_len). Because scan iterates
# over first dimension, we dimshuffle to (seq_len, batch_size) and
# add a broadcastable dimension
mask = mask.dimshuffle(1, 0, 'x')
sequences = [input, mask]
step_fun = step_masked
sequences = input
step_fun = step
ones = T.ones((num_batch, 1))
if not isinstance(self.cell_init, Layer):
# Dot against a 1s vector to repeat to shape (num_batch, num_units)
cell_init =, self.cell_init)
if not isinstance(self.hid_init, Layer):
# Dot against a 1s vector to repeat to shape (num_batch, num_units)
hid_init =, self.hid_init)
# The hidden-to-hidden weight matrix is always used in step
non_seqs = [W_hid_stacked, dropout_mask]
# The "peephole" weight matrices are only used when self.peepholes=True
if self.peepholes:
non_seqs += [self.W_cell_to_ingate,
# When we aren't precomputing the input outside of scan, we need to
# provide the input weights and biases to the step function
if not self.precompute_input:
non_seqs += [W_in_stacked, b_stacked]
if self.unroll_scan:
# Retrieve the dimensionality of the incoming layer
input_shape = self.input_shapes[0]
# Explicitly unroll the recurrence instead of using scan
cell_out, hid_out = unroll_scan(
outputs_info=[cell_init, hid_init],
# Scan op iterates over first dimension of input and repeatedly
# applies the step function
cell_out, hid_out = theano.scan(
outputs_info=[cell_init, hid_init],
# When it is requested that we only return the final sequence step,
# we need to slice it out immediately after scan is applied
if self.only_return_final:
hid_out = hid_out[-1]
# dimshuffle back to (n_batch, n_time_steps, n_features))
hid_out = hid_out.dimshuffle(1, 0, 2)
# if scan is backward reverse the output
if self.backwards:
hid_out = hid_out[:, ::-1]
return hid_out
