Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Minimum implementation of denoising autoencoder.Error function is cross-entropy of reconstruction.Optimizing by SGD with mini-batch.Dataset is available at http://deeplearning.net/data/mnist/mnist.pkl.gz
#coding: utf8
"""
1. Download this gist.
2. Get the MNIST data.
wget http://deeplearning.net/data/mnist/mnist.pkl.gz
3. Run this code.
python autoencoder.py 100 -e 1 -b 20 -v
"""
import numpy
import argparse
import cPickle as pickle
import utils
class Autoencoder(object):
def __init__(self, n_visible = 784, n_hidden = 784, \
W1 = None, W2 = None, b1 =None, b2 = None,
noise = 0.0, untied = False):
self.rng = numpy.random.RandomState(1)
r = numpy.sqrt(6. / (n_hidden + n_visible + 1))
if W1 == None:
self.W1 = self.random_init(r, (n_hidden, n_visible))
if W2 == None:
if untied:
W2 = self.random_init(r, (n_visible, n_hidden))
else:
W2 = self.W1.T
self.W2 = W2
if b1 == None:
self.b1 = numpy.zeros(n_hidden)
if b2 == None:
self.b2 = numpy.zeros(n_visible)
self.n_visible = n_visible
self.n_hidden = n_hidden
self.alpha = 0.1
self.noise = noise
self.untied = untied
def random_init(self, r, size):
return numpy.array(self.rng.uniform(low = -r, high = r, size=size))
def sigmoid(self, x):
return 1. / (1. + numpy.exp(-x))
def sigmoid_prime(self, x):
return x * (1. - x)
def corrupt(self, x, noise):
return self.rng.binomial(size = x.shape, n = 1, p = 1.0 - noise) * x
def encode(self, x):
return self.sigmoid(numpy.dot(self.W1, x) + self.b1)
def decode(self, y):
return self.sigmoid(numpy.dot(self.W2, y) + self.b2)
def get_cost(self, x, z):
eps = 1e-10
return - numpy.sum((x * numpy.log(z + eps) + (1.-x) * numpy.log(1.-z + eps)))
def get_cost_and_grad(self, x_batch, dnum):
cost = 0.
grad_W1 = numpy.zeros(self.W1.shape)
grad_W2 = numpy.zeros(self.W2.shape)
grad_b1 = numpy.zeros(self.b1.shape)
grad_b2 = numpy.zeros(self.b2.shape)
for x in x_batch:
tilde_x = self.corrupt(x, self.noise)
p = self.encode(tilde_x)
y = self.decode(p)
cost += self.get_cost(x,y)
delta1 = - (x - y)
if self.untied:
grad_W2 += numpy.outer(delta1, p)
else:
grad_W1 += numpy.outer(delta1, p).T
grad_b2 += delta1
delta2 = numpy.dot(self.W2.T, delta1) * self.sigmoid_prime(p)
grad_W1 += numpy.outer(delta2, tilde_x)
grad_b1 += delta2
cost /= len(x_batch)
grad_W1 /= len(x_batch)
grad_W2 /= len(x_batch)
grad_b1 /= len(x_batch)
grad_b2 /= len(x_batch)
return cost, grad_W1, grad_W2, grad_b1, grad_b2
def train(self, X, epochs = 15, batch_size = 20):
batch_num = len(X) / batch_size
for epoch in range(epochs):
total_cost = 0.0
for i in range(batch_num):
batch = X[i*batch_size : (i+1)*batch_size]
cost, gradW1, gradW2, gradb1, gradb2 = \
self.get_cost_and_grad(batch, len(X))
total_cost += cost
self.W1 -= self.alpha * gradW1
self.W2 -= self.alpha * gradW2
self.b1 -= self.alpha * gradb1
self.b2 -= self.alpha * gradb2
grad_sum = gradW1.sum() + gradW2.sum() + gradb1.sum() + gradb2.sum()
print epoch,
print (1. / batch_num) * total_cost
def dump_weights(self, save_path):
with open(save_path, 'w') as f:
d = {
"W1" : self.W1,
"W2" : self.W2,
"b1" : self.b1,
"b2" : self.b2,
}
pickle.dump(d, f)
def visualize_weights(self):
tile_size = (int(numpy.sqrt(self.W1[0].size)), int(numpy.sqrt(self.W1[0].size)))
panel_shape = (10, 10)
return utils.visualize_weights(self.W1, panel_shape, tile_size)
#panel_shape = (int(numpy.sqrt(self.W1.shape[0])), int(numpy.sqrt(self.W1.shape[0])))
#return utils.visualize_weights(self.W1, panel_shape, tile_size)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("n_hidden", type = int)
parser.add_argument("-e", "--epochs", type = int, default = 15)
parser.add_argument("-b", "--batch_size", type = int, default = 20)
parser.add_argument("-n", "--noise", type=float, choices=[i/10. for i in xrange(11)], default = 0.0)
parser.add_argument('-o', '--output', type = unicode)
parser.add_argument('-v', '--visualize', action = "store_true")
parser.add_argument('-u', '--untied', action = "store_true")
args = parser.parse_args()
train_data, test_data, valid_data = utils.load_data()
ae = Autoencoder(n_hidden = args.n_hidden, noise = args.noise, untied = args.untied)
try:
ae.train(train_data[0], epochs = args.epochs, batch_size = args.batch_size)
except KeyboardInterrupt:
exit()
pass
save_name = args.output
if save_name == None:
save_name = '%sh%d_e%d_b%d_n%d'%(
'untied_' if args.untied else 'tied_',
args.n_hidden,
args.epochs,
args.batch_size,
args.noise*100,
)
img = ae.visualize_weights()
img.save(save_name + ".bmp")
if args.visualize:
img.show()
ae.dump_weights(save_name + '.pkl')
import numpy
import cPickle as pickle
import gzip
import Image
def load_data():
with gzip.open('mnist.pkl.gz', 'rb') as f:
tr,te,vl = pickle.load(f)
return tr, te, vl
def visualize_weights(weights, panel_shape, tile_size):
def scale(x):
eps = 1e-8
x = x.copy()
x -= x.min()
x *= 1.0 / (x.max() + eps)
return 255.0*x
margin_y = numpy.zeros(tile_size[1])
margin_x = numpy.zeros((tile_size[0] + 1) * panel_shape[0])
image = margin_x.copy()
for y in range(panel_shape[1]):
tmp = numpy.hstack( [ numpy.c_[ scale( x.reshape(tile_size) ), margin_y ]
for x in weights[y*panel_shape[0]:(y+1)*panel_shape[0]]])
tmp = numpy.vstack([tmp, margin_x])
image = numpy.vstack([image, tmp])
img = Image.fromarray(image)
img = img.convert('RGB')
return img
"""
Ad hoc code for gradient checking.
"""
from autoencoder import Autoencoder
import utils
import numpy
tr, te, vl = utils.load_data()
ae = Autoencoder(n_hidden = 50)
#ae = Autoencoder(n_hidden = 50, untied = True)
x = [tr[0][0]]
noise = 0.0
tilde_x = [ae.corrupt(x[0], noise)]
c, gW1, gW2, gb1, gb2 = ae.get_cost_and_grad(tilde_x, len(tr[0]))
print c
epsilon = 1e-4
print "W1"
for i in range(len(ae.W1)):
for j in range(len(ae.W1[i])):
ae.W1[i][j] += epsilon
y = ae.decode(ae.encode(tilde_x[0]))
e_p = ae.get_cost(x[0], y)
ae.W1[i][j] -= 2. * epsilon
y = ae.decode(ae.encode(tilde_x[0]))
e_m = ae.get_cost(x[0], y)
g = ( e_p - e_m ) / (2. * epsilon)
ae.W1[i][j] += epsilon
diff = gW1[i][j] - g
if numpy.absolute(diff) >= epsilon**2:
print i,j, gW1[i][j], g, diff
print "W2"
for i in range(len(ae.W2)):
for j in range(len(ae.W2[i])):
ae.W2[i][j] += epsilon
e_p = ae.get_cost(tilde_x[0], x[0])
ae.W2[i][j] -= 2 * epsilon
e_m = ae.get_cost(tilde_x[0], x[0])
g = ( e_p - e_m ) / (2 * epsilon)
ae.W2[i][j] += epsilon
diff = gW2[i][j] - g
if numpy.absolute(diff) >= epsilon**2:
print i,j, gW2[i][j], g, diff
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.