(deprecated) Peephole connected LSTM in chainer -> (recommend)
import numpy
import six
from chainer import cuda
from chainer import function
from chainer.utils import type_check
def _extract_gates(x):
r = x.reshape((x.shape[0], x.shape[1] / 3, 3) + x.shape[2:])
return (r[:, :, i] for i in six.moves.range(3))
def _sigmoid(x):
return 1 / (1 + numpy.exp(-x))
def _grad_sigmoid(x):
return x * (1 - x)
def _grad_tanh(x):
return 1 - x * x
_preamble = '''
__device__ float sigmoid(float x) { return 1 / (1 + __expf(-x)); }
__device__ float grad_sigmoid(float y) { return y * (1 - y); }
__device__ float grad_tanh(float y) { return 1 - y * y; }
int I = i / rsize; \
int J = i % rsize; \
const float* a_i = a + I * 1 * rsize; \
const float* x_i = x + I * 3 * rsize; \
float aa = tanhf(a_i[ J]); \
float ai = sigmoid(x_i[ J]); \
float af = sigmoid(x_i[1*rsize + J]); \
float ao = sigmoid(x_i[2*rsize + J]);
class PeepholeLSTM(function.Function):
"""Long short-term memory unit with forget gate.
It has two inputs (c, x) and two outputs (c, h), where c indicates the cell
state. x must have four times channels compared to the number of units.
def check_type_forward(self, in_types):
type_check.expect(in_types.size() == 3)
c_type, cell_in_type, gate_in_type = in_types
c_type.dtype == numpy.float32,
cell_in_type.dtype == numpy.float32,
gate_in_type.dtype == numpy.float32,
c_type.ndim >= 2,
cell_in_type.ndim >= 2,
gate_in_type.ndim >= 2,
c_type.ndim == gate_in_type.ndim,
c_type.ndim == cell_in_type.ndim,
cell_in_type.shape == c_type.shape,
gate_in_type.shape[0] == c_type.shape[0],
gate_in_type.shape[1] == 3 * c_type.shape[1],
for i in range(2, c_type.ndim.eval()):
type_check.expect(gate_in_type.shape[i] == c_type.shape[i])
type_check.expect(cell_in_type.shape[i] == c_type.shape[i])
def forward_cpu(self, inputs):
c_prev, a, x = inputs
i, f, o = _extract_gates(x)
self.a = numpy.tanh(a)
self.i = _sigmoid(i)
self.f = _sigmoid(f)
self.o = _sigmoid(o)
self.c = self.a * self.i + self.f * c_prev
h = self.o * numpy.tanh(self.c)
return self.c, h
def backward_cpu(self, inputs, grad_outputs):
c_prev = inputs[0]
gc, gh = grad_outputs
ga = numpy.empty_like(inputs[1])
gx = numpy.empty_like(inputs[2])
gi, gf, go = _extract_gates(gx)
# Consider the case that either gradient is not given
if gc is None:
gc = 0
if gh is None:
gh = 0
co = numpy.tanh(self.c)
gc_prev = gh * self.o * _grad_tanh(co) + gc # multiply f later
ga[:] = gc_prev * self.i * _grad_tanh(self.a)
gi[:] = gc_prev * self.a * _grad_sigmoid(self.i)
gf[:] = gc_prev * c_prev * _grad_sigmoid(self.f)
go[:] = gh * co * _grad_sigmoid(self.o)
gc_prev *= self.f # multiply f here
return gc_prev, ga, gx
def forward_gpu(self, inputs):
c_prev, a, x = inputs
lsize = c_prev.shape[0] * c_prev.shape[1]
rsize = c_prev.size // lsize
self.c = cuda.empty_like(c_prev)
h = cuda.empty_like(c_prev)
'''float* c, float* h, const float* c_prev, const float* a, const float* x,
int lsize, int rsize''',
c[i] = aa * ai + af * c_prev[i];
h[i] = ao * tanhf(c[i]);''',
'lstm_fwd', preamble=_preamble)(self.c, h, c_prev, a, x, lsize, rsize)
return self.c, h
def backward_gpu(self, inputs, grad_outputs):
c_prev, a, x = inputs
gc, gh = grad_outputs
lsize = c_prev.shape[0] * c_prev.shape[1]
rsize = c_prev.size // lsize
# Odd rule to determine whether the gradient is given or not.
if gc is None:
gc = self.c
if gh is None:
gh = self.c
gc_prev = cuda.empty_like(c_prev)
ga = cuda.empty_like(a)
gx = cuda.empty_like(x)
float* gc_prev, float* ga, float* gx, const float* c_prev, const float* a, const float* x,
const float* c, const float* gc, const float* gh, int lsize,
int rsize
''', '''
float* gx_i = gx + I * 3 * rsize;
float* ga_i = ga + I * 1 * rsize;
float& ga_ = ga_i[J];
float& gi = gx_i[ J];
float& gf = gx_i[ rsize + J];
float& go = gx_i[2*rsize + J];
float co = tanhf(c[i]);
// Odd rule: if gh == c [gc == c] then gh [gc] is not given,
// since we cannot pass null pointer to the kernel through
// PyCUDA.
float gc1 = (gh == c ? 0 : gh[i] * ao * grad_tanh(co))
+ (gc == c ? 0 : gc[i]);
go = gh == c ? 0 : gh[i] * co * grad_sigmoid(ao);
gc_prev[i] = gc1 * af;
ga_ = gc1 * ai * grad_tanh(aa);
gi = gc1 * aa * grad_sigmoid(ai);
gf = gc1 * c_prev[i] * grad_sigmoid(af);
'lstm_bwd', preamble=_preamble)(
gc_prev, ga, gx, c_prev, a, x, self.c, gc, gh, lsize, rsize)
return gc_prev, ga, gx
def peephole_lstm(c_prev, a, x):
"""Long Short-Term Memory units as an activation function.
This function implements LSTM units with forget gates. Let the previous
cell state :math:`c_{\\text{prev}}` and the incoming signal :math:`x`.
First, the incoming signal :math:`x` is split into 3 arrays
:math:`i, f, o` of the same shapes along the second axis.
It means that :math:`x` 's second axis must have 3 times the length of
The :math`a` and splitted input signals are corresponding to:
- :math:`a` : sources of cell input
- :math:`i` : sources of input gate
- :math:`f` : sources of forget gate
- :math:`o` : sources of output gate
Second, it computes outputs as:
.. math::
c &= \\tanh(a) \\text{sigmoid}(i)
+ c_{\\text{prev}} \\text{sigmoid}(f), \\\\
h &= \\tanh(c) \\text{sigmoid}(o).
These are returned as a tuple of two variables.
c_prev (~chainer.Variable): Variable that holds the previous cell
state. The cell state should be a zero array or the output of the
previous call of LSTM.
cell_input (~chainer.Variable): Variable that holds the incoming signal into the cell. It must
have the second dimension of that of the cell state,
gate_inputs (~chainer.Variable): Variable that holds the incoming signal into the 3 gates. It must
have the second dimension 3 times of that of the cell state,
tuple: Two :class:`~chainer.Variable` objects ``c`` and ``h``. ``c`` is
the updated cell state. ``h`` indicates the outgoing signal.
See the original paper proposing LSTM with forget gates:
`Long Short-Term Memory in Recurrent Neural Networks \
.. admonition:: Example
Assuming ``y`` is the current input signal, ``c`` is the previous cell
state, and ``h`` is the previous output signal from an ``lstm``
function. Each of ``y``, ``c`` and ``h`` has ``n_units`` channels.
Most typical preparation of ``a, x`` is:
>>> model = FunctionSet(p=F.Linear(n_units, n_units),
... q=F.Linear(n_units, n_units, nobias=True),
... w=F.Linear(n_units, 3 * n_units),
... v=F.Linear(n_units, 3 * n_units, nobias=True),
... u=F.Linear(n_units, 3 * n_units, nobias=True),
... ...)
>>> a = model.p(y) + model.q(h)
>>> x = model.w(y) + model.v(h) + model.u(c)
>>> c, h = F.peephole_lstm(c, a, x)
It corresponds to calculate the input sources :math:`a, i, f, o` from
the current input ``y`` and the previous output ``h``. Different
parameters are used for different kind of input sources.
return PeepholeLSTM()(c_prev, a, x)
import unittest
import numpy
import chainer
from chainer import cuda
from chainer import gradient_check
from chainer import testing
from chainer.testing import attr
from chainer.testing import condition
from src.functions.peephole_lstm import peephole_lstm
if cuda.available:
def _sigmoid(x):
return 1 / (1 + numpy.exp(-x))
class TestPeepholeLSTM(unittest.TestCase):
def setUp(self):
self.c_prev = numpy.random.uniform(-1,
1, (3, 2, 4)).astype(numpy.float32)
self.a = numpy.random.uniform(-1, 1, (3, 2, 4)).astype(numpy.float32)
self.x = numpy.random.uniform(-1, 1, (3, 6, 4)).astype(numpy.float32)
self.gc = numpy.random.uniform(-1, 1, (3, 2, 4)).astype(numpy.float32) = numpy.random.uniform(-1, 1, (3, 2, 4)).astype(numpy.float32)
def flat(self):
self.c_prev = self.c_prev[:, :, 0].copy()
self.a = self.a[:, :, 0].copy()
self.x = self.x[:, :, 0].copy()
self.gc = self.gc[:, :, 0].copy() =[:, :, 0].copy()
def check_forward(self, c_prev_data, a_data, x_data):
c_prev = chainer.Variable(c_prev_data)
a = chainer.Variable(a_data)
x = chainer.Variable(x_data)
c, h = peephole_lstm(c_prev, a, x)
self.assertEqual(, numpy.float32)
self.assertEqual(, numpy.float32)
# Compute expected out
a_in = self.a
i_in = self.x[:, [0, 3]]
f_in = self.x[:, [1, 4]]
o_in = self.x[:, [2, 5]]
c_expect = _sigmoid(i_in) * numpy.tanh(a_in) + \
_sigmoid(f_in) * self.c_prev
h_expect = _sigmoid(o_in) * numpy.tanh(c_expect)
def test_forward_cpu(self):
self.check_forward(self.c_prev, self.a, self.x)
def test_flat_forward_cpu(self):
def test_forward_gpu(self):
self.check_forward(cuda.to_gpu(self.c_prev), cuda.to_gpu(self.a), cuda.to_gpu(self.x))
def test_flat_forward_gpu(self):
def check_backward(self, c_prev_data, a_data, x_data, c_grad, h_grad):
c_prev = chainer.Variable(c_prev_data)
a = chainer.Variable(a_data)
x = chainer.Variable(x_data)
c, h = peephole_lstm(c_prev, a, x)
c.grad = c_grad
h.grad = h_grad
func = c.creator
f = lambda: func.forward((,,
gc_prev, ga, gx = gradient_check.numerical_grad(
f, (,,, (c_grad, h_grad), eps=1e-2)
gradient_check.assert_allclose(gc_prev, c_prev.grad)
gradient_check.assert_allclose(ga, a.grad)
gradient_check.assert_allclose(gx, x.grad)
def test_full_backward_cpu(self):
self.check_backward(self.c_prev, self.a, self.x, self.gc,
def test_flat_full_backward_cpu(self):
def test_no_gc_backward_cpu(self):
self.check_backward(self.c_prev, self.a, self.x, None,
def test_flat_no_gc_backward_cpu(self):
def test_no_gh_backward_cpu(self):
self.check_backward(self.c_prev, self.a, self.x, self.gc, None)
def test_flat_no_gh_backward_cpu(self):
def test_full_backward_gpu(self):
cuda.to_gpu(self.c_prev), cuda.to_gpu(self.a), cuda.to_gpu(self.x),
cuda.to_gpu(self.gc), cuda.to_gpu(
def test_flat_full_backward_gpu(self):
def test_no_gc_backward_gpu(self):
cuda.to_gpu(self.c_prev), cuda.to_gpu(self.a), cuda.to_gpu(self.x),
None, cuda.to_gpu(
def test_flat_no_gc_backward_gpu(self):
def test_no_gh_backward_gpu(self):
cuda.to_gpu(self.c_prev), cuda.to_gpu(self.a), cuda.to_gpu(self.x),
cuda.to_gpu(self.gc), None)
def test_flat_no_gh_backward_gpu(self):
testing.run_module(__name__, __file__)
