Skip to content

Instantly share code, notes, and snippets.

@skaae
Created January 11, 2016 17:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skaae/0ca0c33d0fc908f247d3 to your computer and use it in GitHub Desktop.
Save skaae/0ca0c33d0fc908f247d3 to your computer and use it in GitHub Desktop.
import numpy as np
import theano
import theano.tensor as T
from lasagne import nonlinearities
from lasagne import init
from lasagne.utils import unroll_scan
from lasagne.layers import MergeLayer, Layer, InputLayer, DenseLayer
from lasagne.layers import helper
from lasagne.layers import Gate
class GRUCell(MergeLayer):
r"""
Gated Recurrent Unit (GRU) Layer
Implements the recurrent step proposed in [1]_, which computes the output
by
.. math ::
r_t &= \sigma_r(x_t W_{xr} + h_{t - 1} W_{hr} + b_r)\\
u_t &= \sigma_u(x_t W_{xu} + h_{t - 1} W_{hu} + b_u)\\
c_t &= \sigma_c(x_t W_{xc} + r_t \odot (h_{t - 1} W_{hc}) + b_c)\\
h_t &= (1 - u_t) \odot h_{t - 1} + u_t \odot c_t
Parameters
----------
incoming : a :class:`lasagne.layers.Layer` instance or a tuple
The layer feeding into this layer, or the expected input shape.
num_units_gru : int
Number of hidden units in the layer.
resetgate : Gate
Parameters for the reset gate (:math:`r_t`): :math:`W_{xr}`,
:math:`W_{hr}`, :math:`b_r`, and :math:`\sigma_r`.
updategate : Gate
Parameters for the update gate (:math:`u_t`): :math:`W_{xu}`,
:math:`W_{hu}`, :math:`b_u`, and :math:`\sigma_u`.
hidden_update : Gate
Parameters for the hidden update (:math:`c_t`): :math:`W_{xc}`,
:math:`W_{hc}`, :math:`b_c`, and :math:`\sigma_c`.
hid_init : callable, np.ndarray, theano.shared or :class:`Layer`
Initializer for initial hidden state (:math:`h_0`).
learn_init : bool
If True, initial hidden values are learned.
grad_clipping : float
If nonzero, the gradient messages are clipped to the given value during
the backward pass. See [1]_ (p. 6) for further explanation.
References
----------
.. [1] Cho, Kyunghyun, et al: On the properties of neural
machine translation: Encoder-decoder approaches.
arXiv preprint arXiv:1409.1259 (2014).
.. [2] Chung, Junyoung, et al.: Empirical Evaluation of Gated
Recurrent Neural Networks on Sequence Modeling.
arXiv preprint arXiv:1412.3555 (2014).
.. [3] Graves, Alex: "Generating sequences with recurrent neural networks."
arXiv preprint arXiv:1308.0850 (2013).
Notes
-----
An alternate update for the candidate hidden state is proposed in [2]_:
.. math::
c_t &= \sigma_c(x_t W_{ic} + (r_t \odot h_{t - 1})W_{hc} + b_c)\\
We use the formulation from [1]_ because it allows us to do all matrix
operations in a single dot product.
"""
def __init__(self, x, hid_previous, num_units,
resetgate=Gate(W_cell=None),
updategate=Gate(W_cell=None),
hidden_update=Gate(W_cell=None,
nonlinearity=nonlinearities.tanh),
hid_init=init.Constant(0.),
learn_init=False,
grad_clipping=0,
**kwargs):
if hid_previous.output_shape[-1] != num_units:
raise ValueError('Number of hid_previous inputs should be the '
'same as num_units_gru')
if x.output_shape[0] != hid_previous.output_shape[0]:
raise ValueError('first dimension output of x and hid_previous '
'should be equal')
# Initialize parent layer
super(GRUCell, self).__init__([x, hid_previous], **kwargs)
self.learn_init = learn_init
self.num_units = num_units # this could also be inferred?
self.grad_clipping = grad_clipping
self.unroll_scan = unroll_scan
# Retrieve the dimensionality of the incoming layer
input_shape_x = self.input_shapes[0]
input_shape_h = self.input_shapes[1]
# Input dimensionality is the output dimensionality of the input layer
num_inputs_x = np.prod(input_shape_x[1:])
def add_gate_params(gate, gate_name):
""" Convenience function for adding layer parameters from a Gate
instance. """
return (self.add_param(gate.W_in, (num_inputs_x, num_units),
name="W_in_to_{}".format(gate_name)),
self.add_param(gate.W_hid, (num_units, num_units),
name="W_hid_to_{}".format(gate_name)),
self.add_param(gate.b, (num_units,),
name="b_{}".format(gate_name),
regularizable=False),
gate.nonlinearity)
# Add in all parameters from gates
(self.W_in_to_updategate, self.W_hid_to_updategate, self.b_updategate,
self.nonlinearity_updategate) = add_gate_params(updategate,
'updategate')
(self.W_in_to_resetgate, self.W_hid_to_resetgate, self.b_resetgate,
self.nonlinearity_resetgate) = add_gate_params(resetgate, 'resetgate')
(self.W_in_to_hidden_update, self.W_hid_to_hidden_update,
self.b_hidden_update, self.nonlinearity_hid) = add_gate_params(
hidden_update, 'hidden_update')
# Initialize hidden state
self.hid_init = self.add_param(
hid_init, (1, self.num_units), name="hid_init",
trainable=learn_init, regularizable=False)
# Stack input weight matrices into a (num_inputs, 3*num_units_gru)
# matrix, which speeds up computation
self.W_in_stacked = T.concatenate(
[self.W_in_to_resetgate, self.W_in_to_updategate,
self.W_in_to_hidden_update], axis=1)
# Same for hidden weight matrices
self.W_hid_stacked = T.concatenate(
[self.W_hid_to_resetgate, self.W_hid_to_updategate,
self.W_hid_to_hidden_update], axis=1)
# Stack gate biases into a (3*num_units_gru) vector
self.b_stacked = T.concatenate(
[self.b_resetgate, self.b_updategate,
self.b_hidden_update], axis=0)
def get_hid_init(self, num_batch):
return T.dot(T.ones((num_batch, 1)), self.hid_init)
def get_output_shape_for(self, input_shapes):
# The shape of the input to this layer will be the first element
# of input_shapes, whether or not a mask input is being used.
input_shape = input_shapes[0]
return input_shape[0], self.num_units
def get_output_for(self, inputs, **kwargs):
"""
TODO:
inputs: [x_t, h_previous]
"""
# Retrieve the layer input
input_n, hid_previous = inputs
# Treat all dimensions after the second as flattened feature dimensions
if input_n.ndim > 2:
input_n = T.flatten(input_n, 2)
# At each call to scan, input_n will be (n_time_steps, 3*num_units_gru).
# We define a slicing function that extract the input to each GRU gate
def slice_w(x, n):
return x[:, n*self.num_units:(n+1)*self.num_units]
# Create single recurrent computation step function
# input__n is the n'th vector of the input
def step(input_n, hid_previous, *args):
# Compute W_{hr} h_{t - 1}, W_{hu} h_{t - 1}, and W_{hc} h_{t - 1}
hid_input = T.dot(hid_previous, self.W_hid_stacked)
if self.grad_clipping:
input_n = theano.gradient.grad_clip(
input_n, -self.grad_clipping, self.grad_clipping)
hid_input = theano.gradient.grad_clip(
hid_input, -self.grad_clipping, self.grad_clipping)
input_n = T.dot(input_n, self.W_in_stacked) + self.b_stacked
# Reset and update gates
resetgate = slice_w(hid_input, 0) + slice_w(input_n, 0)
updategate = slice_w(hid_input, 1) + slice_w(input_n, 1)
resetgate = self.nonlinearity_resetgate(resetgate)
updategate = self.nonlinearity_updategate(updategate)
# Compute W_{xc}x_t + r_t \odot (W_{hc} h_{t - 1})
hidden_update_in = slice_w(input_n, 2)
hidden_update_hid = slice_w(hid_input, 2)
hidden_update = hidden_update_in + resetgate*hidden_update_hid
if self.grad_clipping:
hidden_update = theano.gradient.grad_clip(
hidden_update, -self.grad_clipping, self.grad_clipping)
hidden_update = self.nonlinearity_hid(hidden_update)
# Compute (1 - u_t)h_{t - 1} + u_t c_t
hid = (1 - updategate)*hid_previous + updategate*hidden_update
return hid
return step(input_n, hid_previous)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment