Skip to content

Instantly share code, notes, and snippets.

@tomonari-masada
Last active August 29, 2015 14:21
Show Gist options
  • Save tomonari-masada/a7c59a59383176f55aad to your computer and use it in GitHub Desktop.
Save tomonari-masada/a7c59a59383176f55aad to your computer and use it in GitHub Desktop.
MLP sample code
import sys
import numpy as np
import matplotlib.pyplot as plt
import theano
import theano.tensor as T
import scipy
rng = np.random
rng.seed(0)
class Layer(object):
def __init__(self, W_init, b_init, activation):
n_output, n_input = W_init.shape
assert b_init.shape == (n_output,)
self.W = theano.shared(value=W_init.astype(theano.config.floatX),
name='W', borrow=True)
self.b = theano.shared(value=b_init.reshape(n_output, 1).astype(theano.config.floatX),
name='b', borrow=True, broadcastable=(False, True))
self.activation = activation
self.params = [self.W, self.b]
def output(self, x):
lin_output = T.dot(self.W, x) + self.b
return (lin_output if self.activation is None else self.activation(lin_output))
class MLP(object):
def __init__(self, W_init, b_init, activations):
assert len(W_init) == len(b_init) == len(activations)
self.layers = []
for W, b, activation in zip(W_init, b_init, activations):
self.layers.append(Layer(W, b, activation))
self.params = []
for layer in self.layers:
self.params += layer.params
def output(self, x):
for layer in self.layers:
x = layer.output(x)
return x
def squared_error(self, x, y):
return T.sum((self.output(x) - y)**2)
def gradient_updates_momentum(cost, params, learning_rate, momentum):
assert momentum < 1 and momentum >= 0
updates = []
for param in params:
param_update = theano.shared(param.get_value()*0., broadcastable=param.broadcastable)
updates.append((param, param - learning_rate*param_update))
updates.append((param_update, momentum*param_update + (1. - momentum)*T.grad(cost, param)))
return updates
def target(args, params):
return np.exp(scipy.special.gammaln(np.sum(params))
- np.sum(scipy.special.gammaln(params))
+ np.sum((params - 1.) * np.log(args)))
N = 1000
s = rng.exponential(1., N*3)
s = s.reshape(3, N)
s /= np.sum(s, axis=0)
X = []
y = []
for i in range(N):
X.append(s[:, i])
y.append(target(s[:, i], np.array([3., 3., 3.])))
X = np.array(X).T
y = np.array(y)
layer_sizes = [X.shape[0], X.shape[0]*2, X.shape[0]*3, X.shape[0]*3, X.shape[0]*2, X.shape[0], 1]
W_init = []
b_init = []
activations = []
for n_input, n_output in zip(layer_sizes[:-1], layer_sizes[1:]):
W_init.append(np.random.randn(n_output, n_input))
b_init.append(np.ones(n_output))
activations.append(T.nnet.sigmoid)
activations[-1] = None
mlp = MLP(W_init, b_init, activations)
mlp_input = T.matrix('mlp_input')
mlp_target = T.vector('mlp_target')
learning_rate = 0.0001
momentum = 0.9
cost = mlp.squared_error(mlp_input, mlp_target)
train = theano.function([mlp_input, mlp_target], cost,
updates=gradient_updates_momentum(cost, mlp.params, learning_rate, momentum))
mlp_output = theano.function([mlp_input], mlp.output(mlp_input))
iteration = 0
max_iteration = 100000
bsize = 100
while iteration < max_iteration:
indices = rng.randint(N, size=bsize)
current_cost = train(X[:,indices], y[indices])
current_output = mlp_output(X)
if iteration % 10 == 0:
print '{:.5f}'.format(np.sqrt(np.sum((current_output - y)**2)/N))
sys.stdout.flush()
learning_rate *= (1. + 0.001 * iteration)
iteration += 1
learning_rate /= (1. + 0.001 * iteration)
@tomonari-masada
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment