Skip to content

Instantly share code, notes, and snippets.

@neosatrapahereje
Created October 16, 2015 15:31
Show Gist options
  • Save neosatrapahereje/b7427ab63dfc0510cd7b to your computer and use it in GitHub Desktop.
Save neosatrapahereje/b7427ab63dfc0510cd7b to your computer and use it in GitHub Desktop.
Conjugate gradient optimization for Lasagne
from collections import OrderedDict
import numpy as np
import theano
import theano.tensor as T
from theano.ifelse import ifelse
def cg(loss, params, x0=None, max_iters=100, precondition=None, tol=1e-3,
conv_crit='cg'):
"""(Preconditioned) Conjugate Gradient (CG) updates
Generates update expressions of the form:
* ``param := optimized_param``
Parameters
----------
loss : symbolic expression
A scalar loss expression
params : list of shared variables
The variables to generate update expressions for
x0 : shared variable or list of shared variables.
Initialization values for CG. If None, the list is initialized
with zeros of the same size as the parameters
max_iters : int
Maximum number of iterations for CG. Default is 100.
precondition : array or list of arrays
Preconditioner for CG. Default is None.
tol : float
Tolerance for CG. Default is 1e-3
conv_crit : 'cg' or 'hf'
Convergence criterion for stopping CG iterations. If 'cg', the
standard convergence criterion for CG is used. If 'hf', the modified
convergence criterion by Martens is used
Returns
-------
OrderedDict
A dictionary mapping each parameter to its update expression
"""
if not isinstance(params, (list, tuple)):
params = [params]
if precondition is None:
precondition = 1.
if not isinstance(precondition, (list, tuple)):
precondition = [precondition] * len(params)
if x0 is None:
x0 = [theano.shared(np.zeros_like(p.get_value())) for p in params]
if not isinstance(x0, (list, tuple)):
x0 = [x0]
n_params = len(params)
def loop(i, phi_im1, *args):
vi = args[:n_params]
xi = args[n_params:2 * n_params]
ri = args[2 * n_params:3 * n_params]
alpha = args[3 * n_params]
ui = []
for param in params:
uip = T.Rop(T.grad(loss, param), params, vi)
if isinstance(uip, (list, tuple)):
ui.append(uip[0])
else:
ui.append(uip)
t = alpha / sum((vj * uj).sum() for vj, uj in zip(vi, ui))
xi = [x + t * v for x, v in zip(xi, vi)]
ri = [r - t * u for r, u in zip(ri, ui)]
wi = [pc * r for r, pc in zip(ri, precondition)]
beta = sum((w * w).sum() for w in wi)
s = beta / alpha
vi = [pc * w + s * v for w, v, pc in zip(wi, vi, precondition)]
alpha = beta
# Convergence criteria
if conv_crit == 'hf':
Ax = []
for param in params:
ax = T.Rop(T.grad(loss, param), params, xi)
if isinstance(ax, (list, tuple)):
Ax.append(ax[0])
else:
Ax.append(ax)
b = [r + a for r, a in zip(ri, Ax)]
phi_i = sum(0.5 * (x * ax).sum() - (x * bb).sum()
for x, ax, bb in zip(xi, ui, b))
k = T.cast(T.floor(T.max([10, .1 * i])), 'int32')
phi_crit = (phi_i - phi_im1) / phi_i
cc1 = T.lt(k, i)
cc2 = T.lt(phi_i, 0)
cc3 = T.lt(phi_crit, k * tol)
phi_im1 = ifelse(T.lt(i, 10), phi_im1, phi_i)
convergence_crit = T.and_(cc1, T.and_(cc2, cc3))
elif conv_crit == 'cg':
cc1 = T.lt(beta, tol)
cc2 = T.lt(sum((r * r).sum() for r in ri), tol)
convergence_crit = T.and_(cc1, cc2)
return ([phi_im1] + vi + xi + ri + [alpha],
theano.scan_module.until(convergence_crit))
# Initialize variables
r0 = theano.clone(theano.grad(loss, params), replace=dict(zip(params, x0)))
if not isinstance(r0, (list, tuple)):
r0 = [r0]
r0 = [- r for r in r0]
w0 = [pc * r for r, pc in zip(r0, precondition)]
v0 = [pc * w for w, pc in zip(w0, precondition)]
alpha = sum((w * w).sum() for w in w0)
phi_im1 = np.array(0, dtype=theano.config.floatX)
outs, updates = theano.scan(
loop,
outputs_info=([phi_im1] + v0 + x0 + r0 + [alpha]),
sequences=[T.arange(max_iters)]
)
x_sol = outs[1 + n_params: 2 * n_params + 1]
updates = OrderedDict(updates)
for param, sol in zip(params, x_sol):
updates[param] = sol[-1]
return updates
def test_cg_Burden():
print "Example from Burden and Faires' Numerical Analysis, 7th Ed."
# Example from Chapter 7, pp. 477 (Example 3)
# Solve Ax = b
A = np.array([
[0.2, 0.1, 1, 1, 0],
[0.1, 4, -1, 1, -1],
[1, -1, 60, 0, -2],
[1, 1, 0, 8, 4],
[0, -1, -2, 4, 700]]).astype(theano.config.floatX)
b = np.array([1, 2, 3, 4, 5]).astype(theano.config.floatX)
x = theano.shared(np.zeros_like(b).astype(theano.config.floatX), name='x')
real_sol = np.array([7.859713071,
0.4229264082,
-0.07359223906,
-0.5406430164,
0.01062616286]).astype(theano.config.floatX)
precondition = 1 / np.sqrt(A.diagonal())
# Loss function for conjugate gradient
loss = (0.5 * T.dot(x.T, T.dot(A, x)) - T.dot(x, b))
x_sol_hf = cg(loss, x, precondition=precondition, conv_crit='hf')
x_sol_cg = cg(loss, x, precondition=precondition, conv_crit='cg')
x_sol_hf_fun = theano.function([], x_sol_hf[x])
x_sol_cg_fun = theano.function([], x_sol_cg[x])
solution_hf = x_sol_hf_fun()
solution_cg = x_sol_cg_fun()
print 'Real solution:'
print real_sol
print 'CG solution (convergence criterion:hf):'
print solution_hf
print'MSE'
print ((solution_hf - real_sol)**2).sum()
print 'CG solution (convergence criterion:cg):'
print solution_cg
print 'MSE'
print ((solution_cg - real_sol)**2).sum()
def test_lasagne(n_features, n_hidden, n_samples_train, n_samples_test):
import lasagne
print 'Toy Neural Network example'
l_in = lasagne.layers.InputLayer((None, n_features))
l_out = lasagne.layers.DenseLayer(
l_in, n_features, nonlinearity=None)
y = lasagne.layers.get_output(l_out)
params = lasagne.layers.get_all_params(l_out)
t = T.matrix()
loss = lasagne.objectives.squared_error(y, t).mean()
updates = cg(loss, params, conv_crit='hf')
loss_fun = theano.function([l_in.input_var, t], loss, updates=updates)
test_fun = theano.function([l_in.input_var, t], loss)
predict = theano.function([l_in.input_var], y)
X_train = np.random.rand(
n_samples_train, n_features).astype(theano.config.floatX)
y_train = 2. * X_train
X_test = np.random.rand(
n_samples_test, n_features).astype(theano.config.floatX)
y_test = 2. * X_test
predictions_before = predict(X_test)
test_loss_before = test_fun(X_test, y_test)
print 'Test loss before training:{0}'.format(test_loss_before)
print 'Output/Input ratio (expected is 2):{0}'.format(
predictions_before.sum() / X_test.sum())
# Training
loss_fun(X_train, y_train)
test_loss_after = test_fun(X_test, y_test)
predictions_after = predict(X_test)
print 'Test loss after training:{0}'.format(test_loss_after)
print 'Output/Input ratio (expected is 2):{0}'.format(
predictions_after.sum() / X_test.sum())
if __name__ == '__main__':
test_cg_Burden()
test_lasagne(n_features=100,
n_hidden=50,
n_samples_train=1000,
n_samples_test=100)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment