Skip to content

Instantly share code, notes, and snippets.

@ktnyt
Last active February 22, 2019 23:04
Show Gist options
  • Save ktnyt/c426a0a0da1180fd8e48 to your computer and use it in GitHub Desktop.
Save ktnyt/c426a0a0da1180fd8e48 to your computer and use it in GitHub Desktop.
Naive implementation of a recursive autoencoder
import numpy as np
import theano
import theano.tensor as T
class RAE(object):
def __init__(self, input, rng, d, W=None, b=None, U=None, c=None):
dv = d
dh = d * 2
self.dv = dv
self.dh = dh
self.initx = input[0]
self.input = input[1:]
if W == None:
W = np.asarray(rng.uniform(
low=-4 * np.sqrt(6. / (dv + dh)),
high=4 * np.sqrt(6. / (dv + dh)),
size=(dh, dv)), dtype=theano.config.floatX)
if U == None:
U = np.asarray(rng.uniform(
low=-4 * np.sqrt(6. / (dv + dh)),
high=4 * np.sqrt(6. / (dv + dh)),
size=(dv, dh)), dtype=theano.config.floatX)
if b == None:
b = np.zeros((dv,), dtype=theano.config.floatX)
if c == None:
c = np.zeros((dh,), dtype=theano.config.floatX)
self.W = theano.shared(value=W, name="W")
self.U = theano.shared(value=U, name="U")
self.b = theano.shared(value=b, name="b")
self.c = theano.shared(value=c, name="c")
self.params = [self.W, self.b, self.U, self.c]
self.updates = []
for param in self.params:
init = np.zeros(param.get_value(borrow=True).shape,
dtype=theano.config.floatX)
self.updates.append((param, theano.shared(init)))
def step(x1, x2):
origin = T.concatenate([x2, x1], axis=0)
encode = self.encode(origin)
decode = self.decode(encode)
return encode, decode, origin
[encode, decode, origin], _ = theano.scan(
fn=step,
sequences=self.input,
outputs_info=[self.initx, None, None]
)
self.L1 = abs(self.W.sum()) + abs(self.U.sum())
self.L2_sqr = (self.W ** 2).sum() + (self.U ** 2).sum()
self.loss = self.error(origin, decode)
def encode(self, origin):
return T.nnet.sigmoid(T.dot(origin, self.W) + self.b)
def decode(self, encode):
return T.dot(encode, self.U) + self.c
def error(self, origin, decode):
return T.mean((decode - origin) ** 2)
if __name__ == '__main__':
rng = np.random.RandomState(1234)
d = 8
X = T.dmatrix(name="X")
rae = RAE(input=X, rng=rng, d=d)
cost = rae.loss
l_r = 0.2
gparams = []
for param in rae.params:
gparam = T.grad(cost, param)
gparams.append(gparam)
updates = []
for param, gparam in zip(rae.params, gparams):
update = l_r * gparam
updates.append((param, param - update))
index = T.lscalar(name="index")
data = np.eye(d)
train_error = theano.function(
inputs=[],
outputs=rae.loss,
givens={
X: data
}
)
train = theano.function(
inputs=[],
outputs=cost,
updates=updates,
givens={
X: data
}
)
init_W = rae.W.get_value()
for epoch in xrange(50000):
loss = train()
print "%d\t%f" % (epoch + 1, loss)
print init_W
print rae.W.get_value()
print "encode step 1"
encode = rae.encode(T.concatenate([data[0], data[1]], axis=0))
print encode.eval()
print "decode step 1"
decode = rae.decode(encode)
print decode.eval()
print "encode step 2"
encode = rae.encode(T.concatenate([encode, data[2]], axis=0))
print encode.eval()
print "decode step 1"
decode = rae.decode(encode)
print decode.eval()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment