Created
July 21, 2016 10:11
-
-
Save rasmusbergpalm/3c8e1338528bc5572d077275f518e7a6 to your computer and use it in GitHub Desktop.
Lasagne LSTM w. dropout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lasagne import * | |
from lasagne.layers import * | |
from lasagne.random import get_rng | |
from lasagne.utils import * | |
import numpy as np | |
import theano.tensor as T | |
from theano.tensor.shared_randomstreams import RandomStreams | |
class DropoutLSTMLayer(MergeLayer): | |
r""" | |
lasagne.layers.recurrent.LSTMLayer(incoming, num_units, | |
ingate=lasagne.layers.Gate(), forgetgate=lasagne.layers.Gate(), | |
cell=lasagne.layers.Gate( | |
W_cell=None, nonlinearity=lasagne.nonlinearities.tanh), | |
outgate=lasagne.layers.Gate(), | |
nonlinearity=lasagne.nonlinearities.tanh, | |
cell_init=lasagne.init.Constant(0.), | |
hid_init=lasagne.init.Constant(0.), backwards=False, learn_init=False, | |
peepholes=True, gradient_steps=-1, grad_clipping=0, unroll_scan=False, | |
precompute_input=True, mask_input=None, only_return_final=False, **kwargs) | |
A long short-term memory (LSTM) layer. | |
Includes optional "peephole connections" and a forget gate. Based on the | |
definition in [1]_, which is the current common definition. The output is | |
computed by | |
.. math :: | |
i_t &= \sigma_i(x_t W_{xi} + h_{t-1} W_{hi} | |
+ w_{ci} \odot c_{t-1} + b_i)\\ | |
f_t &= \sigma_f(x_t W_{xf} + h_{t-1} W_{hf} | |
+ w_{cf} \odot c_{t-1} + b_f)\\ | |
c_t &= f_t \odot c_{t - 1} | |
+ i_t \odot \sigma_c(x_t W_{xc} + h_{t-1} W_{hc} + b_c)\\ | |
o_t &= \sigma_o(x_t W_{xo} + h_{t-1} W_{ho} + w_{co} \odot c_t + b_o)\\ | |
h_t &= o_t \odot \sigma_h(c_t) | |
Parameters | |
---------- | |
incoming : a :class:`lasagne.layers.Layer` instance or a tuple | |
The layer feeding into this layer, or the expected input shape. | |
num_units : int | |
Number of hidden/cell units in the layer. | |
ingate : Gate | |
Parameters for the input gate (:math:`i_t`): :math:`W_{xi}`, | |
:math:`W_{hi}`, :math:`w_{ci}`, :math:`b_i`, and :math:`\sigma_i`. | |
forgetgate : Gate | |
Parameters for the forget gate (:math:`f_t`): :math:`W_{xf}`, | |
:math:`W_{hf}`, :math:`w_{cf}`, :math:`b_f`, and :math:`\sigma_f`. | |
cell : Gate | |
Parameters for the cell computation (:math:`c_t`): :math:`W_{xc}`, | |
:math:`W_{hc}`, :math:`b_c`, and :math:`\sigma_c`. | |
outgate : Gate | |
Parameters for the output gate (:math:`o_t`): :math:`W_{xo}`, | |
:math:`W_{ho}`, :math:`w_{co}`, :math:`b_o`, and :math:`\sigma_o`. | |
nonlinearity : callable or None | |
The nonlinearity that is applied to the output (:math:`\sigma_h`). If | |
None is provided, no nonlinearity will be applied. | |
cell_init : callable, np.ndarray, theano.shared or :class:`Layer` | |
Initializer for initial cell state (:math:`c_0`). | |
hid_init : callable, np.ndarray, theano.shared or :class:`Layer` | |
Initializer for initial hidden state (:math:`h_0`). | |
backwards : bool | |
If True, process the sequence backwards and then reverse the | |
output again such that the output from the layer is always | |
from :math:`x_1` to :math:`x_n`. | |
learn_init : bool | |
If True, initial hidden values are learned. | |
peepholes : bool | |
If True, the LSTM uses peephole connections. | |
When False, `ingate.W_cell`, `forgetgate.W_cell` and | |
`outgate.W_cell` are ignored. | |
gradient_steps : int | |
Number of timesteps to include in the backpropagated gradient. | |
If -1, backpropagate through the entire sequence. | |
grad_clipping : float | |
If nonzero, the gradient messages are clipped to the given value during | |
the backward pass. See [1]_ (p. 6) for further explanation. | |
unroll_scan : bool | |
If True the recursion is unrolled instead of using scan. For some | |
graphs this gives a significant speed up but it might also consume | |
more memory. When `unroll_scan` is True, backpropagation always | |
includes the full sequence, so `gradient_steps` must be set to -1 and | |
the input sequence length must be known at compile time (i.e., cannot | |
be given as None). | |
precompute_input : bool | |
If True, precompute input_to_hid before iterating through | |
the sequence. This can result in a speedup at the expense of | |
an increase in memory usage. | |
mask_input : :class:`lasagne.layers.Layer` | |
Layer which allows for a sequence mask to be input, for when sequences | |
are of variable length. Default `None`, which means no mask will be | |
supplied (i.e. all sequences are of the same length). | |
only_return_final : bool | |
If True, only return the final sequential output (e.g. for tasks where | |
a single target value for the entire sequence is desired). In this | |
case, Theano makes an optimization which saves memory. | |
References | |
---------- | |
.. [1] Graves, Alex: "Generating sequences with recurrent neural networks." | |
arXiv preprint arXiv:1308.0850 (2013). | |
""" | |
def __init__(self, incoming, num_units, | |
ingate=Gate(), | |
forgetgate=Gate(), | |
cell=Gate(W_cell=None, nonlinearity=nonlinearities.tanh), | |
outgate=Gate(), | |
nonlinearity=nonlinearities.tanh, | |
cell_init=init.Constant(0.), | |
hid_init=init.Constant(0.), | |
backwards=False, | |
learn_init=False, | |
peepholes=True, | |
gradient_steps=-1, | |
grad_clipping=0, | |
unroll_scan=False, | |
precompute_input=True, | |
mask_input=None, | |
only_return_final=False, | |
dropout=0., | |
**kwargs): | |
# This layer inherits from a MergeLayer, because it can have four | |
# inputs - the layer input, the mask, the initial hidden state and the | |
# inital cell state. We will just provide the layer input as incomings, | |
# unless a mask input, inital hidden state or initial cell state was | |
# provided. | |
incomings = [incoming] | |
self.mask_incoming_index = -1 | |
self.hid_init_incoming_index = -1 | |
self.cell_init_incoming_index = -1 | |
if mask_input is not None: | |
incomings.append(mask_input) | |
self.mask_incoming_index = len(incomings)-1 | |
if isinstance(hid_init, Layer): | |
incomings.append(hid_init) | |
self.hid_init_incoming_index = len(incomings)-1 | |
if isinstance(cell_init, Layer): | |
incomings.append(cell_init) | |
self.cell_init_incoming_index = len(incomings)-1 | |
# Initialize parent layer | |
super(DropoutLSTMLayer, self).__init__(incomings, **kwargs) | |
# If the provided nonlinearity is None, make it linear | |
if nonlinearity is None: | |
self.nonlinearity = nonlinearities.identity | |
else: | |
self.nonlinearity = nonlinearity | |
self.learn_init = learn_init | |
self.num_units = num_units | |
self.backwards = backwards | |
self.peepholes = peepholes | |
self.gradient_steps = gradient_steps | |
self.grad_clipping = grad_clipping | |
self.unroll_scan = unroll_scan | |
self.precompute_input = precompute_input | |
self.only_return_final = only_return_final | |
self._srng = RandomStreams(get_rng().randint(1, 2147462579)) | |
if 0. <= dropout < 1.: | |
self.dropout = dropout | |
else: | |
raise ValueError("dropout must be between 0 and 1.") | |
if unroll_scan and gradient_steps != -1: | |
raise ValueError( | |
"Gradient steps must be -1 when unroll_scan is true.") | |
# Retrieve the dimensionality of the incoming layer | |
input_shape = self.input_shapes[0] | |
if unroll_scan and input_shape[1] is None: | |
raise ValueError("Input sequence length cannot be specified as " | |
"None when unroll_scan is True") | |
num_inputs = np.prod(input_shape[2:]) | |
def add_gate_params(gate, gate_name): | |
""" Convenience function for adding layer parameters from a Gate | |
instance. """ | |
return (self.add_param(gate.W_in, (num_inputs, num_units), | |
name="W_in_to_{}".format(gate_name)), | |
self.add_param(gate.W_hid, (num_units, num_units), | |
name="W_hid_to_{}".format(gate_name)), | |
self.add_param(gate.b, (num_units,), | |
name="b_{}".format(gate_name), | |
regularizable=False), | |
gate.nonlinearity) | |
# Add in parameters from the supplied Gate instances | |
(self.W_in_to_ingate, self.W_hid_to_ingate, self.b_ingate, | |
self.nonlinearity_ingate) = add_gate_params(ingate, 'ingate') | |
(self.W_in_to_forgetgate, self.W_hid_to_forgetgate, self.b_forgetgate, | |
self.nonlinearity_forgetgate) = add_gate_params(forgetgate, | |
'forgetgate') | |
(self.W_in_to_cell, self.W_hid_to_cell, self.b_cell, | |
self.nonlinearity_cell) = add_gate_params(cell, 'cell') | |
(self.W_in_to_outgate, self.W_hid_to_outgate, self.b_outgate, | |
self.nonlinearity_outgate) = add_gate_params(outgate, 'outgate') | |
# If peephole (cell to gate) connections were enabled, initialize | |
# peephole connections. These are elementwise products with the cell | |
# state, so they are represented as vectors. | |
if self.peepholes: | |
self.W_cell_to_ingate = self.add_param( | |
ingate.W_cell, (num_units, ), name="W_cell_to_ingate") | |
self.W_cell_to_forgetgate = self.add_param( | |
forgetgate.W_cell, (num_units, ), name="W_cell_to_forgetgate") | |
self.W_cell_to_outgate = self.add_param( | |
outgate.W_cell, (num_units, ), name="W_cell_to_outgate") | |
# Setup initial values for the cell and the hidden units | |
if isinstance(cell_init, Layer): | |
self.cell_init = cell_init | |
else: | |
self.cell_init = self.add_param( | |
cell_init, (1, num_units), name="cell_init", | |
trainable=learn_init, regularizable=False) | |
if isinstance(hid_init, Layer): | |
self.hid_init = hid_init | |
else: | |
self.hid_init = self.add_param( | |
hid_init, (1, self.num_units), name="hid_init", | |
trainable=learn_init, regularizable=False) | |
def get_output_shape_for(self, input_shapes): | |
# The shape of the input to this layer will be the first element | |
# of input_shapes, whether or not a mask input is being used. | |
input_shape = input_shapes[0] | |
# When only_return_final is true, the second (sequence step) dimension | |
# will be flattened | |
if self.only_return_final: | |
return input_shape[0], self.num_units | |
# Otherwise, the shape will be (n_batch, n_steps, num_units) | |
else: | |
return input_shape[0], input_shape[1], self.num_units | |
def get_output_for(self, inputs, deterministic=False, **kwargs): | |
""" | |
Compute this layer's output function given a symbolic input variable | |
Parameters | |
---------- | |
inputs : list of theano.TensorType | |
`inputs[0]` should always be the symbolic input variable. When | |
this layer has a mask input (i.e. was instantiated with | |
`mask_input != None`, indicating that the lengths of sequences in | |
each batch vary), `inputs` should have length 2, where `inputs[1]` | |
is the `mask`. The `mask` should be supplied as a Theano variable | |
denoting whether each time step in each sequence in the batch is | |
part of the sequence or not. `mask` should be a matrix of shape | |
``(n_batch, n_time_steps)`` where ``mask[i, j] = 1`` when ``j <= | |
(length of sequence i)`` and ``mask[i, j] = 0`` when ``j > (length | |
of sequence i)``. When the hidden state of this layer is to be | |
pre-filled (i.e. was set to a :class:`Layer` instance) `inputs` | |
should have length at least 2, and `inputs[-1]` is the hidden state | |
to prefill with. When the cell state of this layer is to be | |
pre-filled (i.e. was set to a :class:`Layer` instance) `inputs` | |
should have length at least 2, and `inputs[-1]` is the hidden state | |
to prefill with. When both the cell state and the hidden state are | |
being pre-filled `inputs[-2]` is the hidden state, while | |
`inputs[-1]` is the cell state. | |
deterministic : boolean | |
Whether or not to produce deterministic outputs | |
Returns | |
------- | |
layer_output : theano.TensorType | |
Symbolic output variable. | |
""" | |
# Retrieve the layer input | |
input = inputs[0] | |
# Retrieve the mask when it is supplied | |
mask = None | |
hid_init = None | |
cell_init = None | |
if self.mask_incoming_index > 0: | |
mask = inputs[self.mask_incoming_index] | |
if self.hid_init_incoming_index > 0: | |
hid_init = inputs[self.hid_init_incoming_index] | |
if self.cell_init_incoming_index > 0: | |
cell_init = inputs[self.cell_init_incoming_index] | |
# Treat all dimensions after the second as flattened feature dimensions | |
if input.ndim > 3: | |
input = T.flatten(input, 3) | |
# Because scan iterates over the first dimension we dimshuffle to | |
# (n_time_steps, n_batch, n_features) | |
input = input.dimshuffle(1, 0, 2) | |
seq_len, num_batch, _ = input.shape | |
# Stack input weight matrices into a (num_inputs, 4*num_units) | |
# matrix, which speeds up computation | |
W_in_stacked = T.concatenate( | |
[self.W_in_to_ingate, self.W_in_to_forgetgate, | |
self.W_in_to_cell, self.W_in_to_outgate], axis=1) | |
# Same for hidden weight matrices | |
W_hid_stacked = T.concatenate( | |
[self.W_hid_to_ingate, self.W_hid_to_forgetgate, | |
self.W_hid_to_cell, self.W_hid_to_outgate], axis=1) | |
# Stack biases into a (4*num_units) vector | |
b_stacked = T.concatenate( | |
[self.b_ingate, self.b_forgetgate, | |
self.b_cell, self.b_outgate], axis=0) | |
if self.dropout > 0 and not deterministic: | |
retain_prob = T.constant(1) - self.dropout | |
dropout_mask = self._srng.binomial((num_batch, self.num_units), p=self.dropout, dtype=input.dtype) / retain_prob | |
else: | |
dropout_mask = T.ones((input.shape[0], self.num_units), dtype=input.dtype) | |
if self.precompute_input: | |
# Because the input is given for all time steps, we can | |
# precompute_input the inputs dot weight matrices before scanning. | |
# W_in_stacked is (n_features, 4*num_units). input is then | |
# (n_time_steps, n_batch, 4*num_units). | |
input = T.dot(input, W_in_stacked) + b_stacked | |
# At each call to scan, input_n will be (n_time_steps, 4*num_units). | |
# We define a slicing function that extract the input to each LSTM gate | |
def slice_w(x, n): | |
return x[:, n*self.num_units:(n+1)*self.num_units] | |
# Create single recurrent computation step function | |
# input_n is the n'th vector of the input | |
def step(input_n, cell_previous, hid_previous, *args): | |
if not self.precompute_input: | |
input_n = T.dot(input_n, W_in_stacked) + b_stacked | |
# Calculate gates pre-activations and slice | |
gates = input_n + T.dot(hid_previous * dropout_mask, W_hid_stacked) | |
# Clip gradients | |
if self.grad_clipping: | |
gates = theano.gradient.grad_clip( | |
gates, -self.grad_clipping, self.grad_clipping) | |
# Extract the pre-activation gate values | |
ingate = slice_w(gates, 0) | |
forgetgate = slice_w(gates, 1) | |
cell_input = slice_w(gates, 2) | |
outgate = slice_w(gates, 3) | |
if self.peepholes: | |
# Compute peephole connections | |
ingate += cell_previous*self.W_cell_to_ingate | |
forgetgate += cell_previous*self.W_cell_to_forgetgate | |
# Apply nonlinearities | |
ingate = self.nonlinearity_ingate(ingate) | |
forgetgate = self.nonlinearity_forgetgate(forgetgate) | |
cell_input = self.nonlinearity_cell(cell_input) | |
# Compute new cell value | |
cell = forgetgate*cell_previous + ingate*cell_input | |
if self.peepholes: | |
outgate += cell*self.W_cell_to_outgate | |
outgate = self.nonlinearity_outgate(outgate) | |
# Compute new hidden unit activation | |
hid = outgate*self.nonlinearity(cell) | |
return [cell, hid] | |
def step_masked(input_n, mask_n, cell_previous, hid_previous, *args): | |
cell, hid = step(input_n, cell_previous, hid_previous, *args) | |
# Skip over any input with mask 0 by copying the previous | |
# hidden state; proceed normally for any input with mask 1. | |
cell = T.switch(mask_n, cell, cell_previous) | |
hid = T.switch(mask_n, hid, hid_previous) | |
return [cell, hid] | |
if mask is not None: | |
# mask is given as (batch_size, seq_len). Because scan iterates | |
# over first dimension, we dimshuffle to (seq_len, batch_size) and | |
# add a broadcastable dimension | |
mask = mask.dimshuffle(1, 0, 'x') | |
sequences = [input, mask] | |
step_fun = step_masked | |
else: | |
sequences = input | |
step_fun = step | |
ones = T.ones((num_batch, 1)) | |
if not isinstance(self.cell_init, Layer): | |
# Dot against a 1s vector to repeat to shape (num_batch, num_units) | |
cell_init = T.dot(ones, self.cell_init) | |
if not isinstance(self.hid_init, Layer): | |
# Dot against a 1s vector to repeat to shape (num_batch, num_units) | |
hid_init = T.dot(ones, self.hid_init) | |
# The hidden-to-hidden weight matrix is always used in step | |
non_seqs = [W_hid_stacked, dropout_mask] | |
# The "peephole" weight matrices are only used when self.peepholes=True | |
if self.peepholes: | |
non_seqs += [self.W_cell_to_ingate, | |
self.W_cell_to_forgetgate, | |
self.W_cell_to_outgate] | |
# When we aren't precomputing the input outside of scan, we need to | |
# provide the input weights and biases to the step function | |
if not self.precompute_input: | |
non_seqs += [W_in_stacked, b_stacked] | |
if self.unroll_scan: | |
# Retrieve the dimensionality of the incoming layer | |
input_shape = self.input_shapes[0] | |
# Explicitly unroll the recurrence instead of using scan | |
cell_out, hid_out = unroll_scan( | |
fn=step_fun, | |
sequences=sequences, | |
outputs_info=[cell_init, hid_init], | |
go_backwards=self.backwards, | |
non_sequences=non_seqs, | |
n_steps=input_shape[1]) | |
else: | |
# Scan op iterates over first dimension of input and repeatedly | |
# applies the step function | |
cell_out, hid_out = theano.scan( | |
fn=step_fun, | |
sequences=sequences, | |
outputs_info=[cell_init, hid_init], | |
go_backwards=self.backwards, | |
truncate_gradient=self.gradient_steps, | |
non_sequences=non_seqs, | |
strict=True)[0] | |
# When it is requested that we only return the final sequence step, | |
# we need to slice it out immediately after scan is applied | |
if self.only_return_final: | |
hid_out = hid_out[-1] | |
else: | |
# dimshuffle back to (n_batch, n_time_steps, n_features)) | |
hid_out = hid_out.dimshuffle(1, 0, 2) | |
# if scan is backward reverse the output | |
if self.backwards: | |
hid_out = hid_out[:, ::-1] | |
return hid_out |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Proposed solution:
https://gist.github.com/df7d8e03aa3582f06e4481345898850b.git