Last active
February 22, 2019 23:04
Naive implementation of a recursive autoencoder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import theano | |
import theano.tensor as T | |
class RAE(object): | |
def __init__(self, input, rng, d, W=None, b=None, U=None, c=None): | |
dv = d | |
dh = d * 2 | |
self.dv = dv | |
self.dh = dh | |
self.initx = input[0] | |
self.input = input[1:] | |
if W == None: | |
W = np.asarray(rng.uniform( | |
low=-4 * np.sqrt(6. / (dv + dh)), | |
high=4 * np.sqrt(6. / (dv + dh)), | |
size=(dh, dv)), dtype=theano.config.floatX) | |
if U == None: | |
U = np.asarray(rng.uniform( | |
low=-4 * np.sqrt(6. / (dv + dh)), | |
high=4 * np.sqrt(6. / (dv + dh)), | |
size=(dv, dh)), dtype=theano.config.floatX) | |
if b == None: | |
b = np.zeros((dv,), dtype=theano.config.floatX) | |
if c == None: | |
c = np.zeros((dh,), dtype=theano.config.floatX) | |
self.W = theano.shared(value=W, name="W") | |
self.U = theano.shared(value=U, name="U") | |
self.b = theano.shared(value=b, name="b") | |
self.c = theano.shared(value=c, name="c") | |
self.params = [self.W, self.b, self.U, self.c] | |
self.updates = [] | |
for param in self.params: | |
init = np.zeros(param.get_value(borrow=True).shape, | |
dtype=theano.config.floatX) | |
self.updates.append((param, theano.shared(init))) | |
def step(x1, x2): | |
origin = T.concatenate([x2, x1], axis=0) | |
encode = self.encode(origin) | |
decode = self.decode(encode) | |
return encode, decode, origin | |
[encode, decode, origin], _ = theano.scan( | |
fn=step, | |
sequences=self.input, | |
outputs_info=[self.initx, None, None] | |
) | |
self.L1 = abs(self.W.sum()) + abs(self.U.sum()) | |
self.L2_sqr = (self.W ** 2).sum() + (self.U ** 2).sum() | |
self.loss = self.error(origin, decode) | |
def encode(self, origin): | |
return T.nnet.sigmoid(T.dot(origin, self.W) + self.b) | |
def decode(self, encode): | |
return T.dot(encode, self.U) + self.c | |
def error(self, origin, decode): | |
return T.mean((decode - origin) ** 2) | |
if __name__ == '__main__': | |
rng = np.random.RandomState(1234) | |
d = 8 | |
X = T.dmatrix(name="X") | |
rae = RAE(input=X, rng=rng, d=d) | |
cost = rae.loss | |
l_r = 0.2 | |
gparams = [] | |
for param in rae.params: | |
gparam = T.grad(cost, param) | |
gparams.append(gparam) | |
updates = [] | |
for param, gparam in zip(rae.params, gparams): | |
update = l_r * gparam | |
updates.append((param, param - update)) | |
index = T.lscalar(name="index") | |
data = np.eye(d) | |
train_error = theano.function( | |
inputs=[], | |
outputs=rae.loss, | |
givens={ | |
X: data | |
} | |
) | |
train = theano.function( | |
inputs=[], | |
outputs=cost, | |
updates=updates, | |
givens={ | |
X: data | |
} | |
) | |
init_W = rae.W.get_value() | |
for epoch in xrange(50000): | |
loss = train() | |
print "%d\t%f" % (epoch + 1, loss) | |
print init_W | |
print rae.W.get_value() | |
print "encode step 1" | |
encode = rae.encode(T.concatenate([data[0], data[1]], axis=0)) | |
print encode.eval() | |
print "decode step 1" | |
decode = rae.decode(encode) | |
print decode.eval() | |
print "encode step 2" | |
encode = rae.encode(T.concatenate([encode, data[2]], axis=0)) | |
print encode.eval() | |
print "decode step 1" | |
decode = rae.decode(encode) | |
print decode.eval() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment