Skip to content

Instantly share code, notes, and snippets.

@dendisuhubdy
Forked from botev/run_yellow_fin_vae.py
Created July 4, 2017 12:51
Show Gist options
  • Save dendisuhubdy/cc1d2d83e3f35d3216c2a62087ebaaa4 to your computer and use it in GitHub Desktop.
Save dendisuhubdy/cc1d2d83e3f35d3216c2a62087ebaaa4 to your computer and use it in GitHub Desktop.
Theano Yellow Fin
import numpy as np
import theano
import theano.tensor as T
from theano.printing import Print
from collections import OrderedDict
def yellow_fin(loss, params, beta=0.99,
learning_rate_init=0.01, momentum_init=0.0,
t=None, window_width=20, debug=False):
"""
The Yellow Fin algorithm.
:param loss: Theano expression of the loss
:param params: List of shared variables
:param beta: The moving average smoothing variable
:param learning_rate_init: initial learning rate
:param momentum_init: initial momentum
:param t: optionally pass your own time variable
:param window_width: width of the window for calculating h_max and h_min
:param debug: flag for debugging
:return:
"""
grads = T.grad(loss, params)
updates = OrderedDict()
alpha = theano.shared(value_floatX(learning_rate_init), name="learning_rate")
mu = theano.shared(value_floatX(momentum_init), name="momentum")
if t is None:
t = theano.shared(np.asarray(0).astype(np.int32), name="t")
updates[t] = t + 1
# Fetch variables from the routines
h_max, h_min = curvature_range(grads, beta, t, updates, window_width)
c = gradient_variance(grads, params, beta, updates)
d = distance_to_optim(grads, beta, updates)
if debug:
h_max = print_values(h_max, "h_max")
h_min = print_values(h_min, "h_min")
c = print_values(c, "c")
d = print_values(d, "d")
# Get the solution to the minimisation problem for mu
sqrt_mu1 = solve(c, d, h_min)
if debug:
sqrt_mu1 = print_values(sqrt_mu1, "sqrt_mu1")
sqrt_mu2 = (T.sqrt(h_max) - T.sqrt(h_min)) / (T.sqrt(h_max) + T.sqrt(h_min))
sqrt_mu = T.maximum(sqrt_mu1, sqrt_mu2)
# Given the solution the final mu_t and alpha_t
alpha_t = T.sqr(1 - sqrt_mu) / h_min
mu_t = T.sqr(sqrt_mu)
if debug:
mu_t = print_values(mu_t, "mu_t")
alpha_t = print_values(alpha_t, "alpha_t")
# Update moving averages
updates[mu] = ema(beta, mu, mu_t)
updates[alpha] = ema(beta, alpha, alpha_t)
if debug:
updates[mu] = print_values(updates[mu], "mu")
updates[alpha] = print_values(updates[alpha], "alpha")
# Apply momentum
momentum(grads, params, updates[alpha], updates[mu], updates)
return updates
def curvature_range(grads, beta, t, updates, window_width=20, debug=False):
"""
Routine for calculating the h_max and h_min curvature range.
"""
# Update the window
window = theano.shared(T.zeros((window_width, )).eval(), name="window")
t_mod = T.mod_check(t, window_width)
updates[window] = T.set_subtensor(window[t_mod], sum(T.sum(T.sqr(g)) for g in grads))
if debug:
updates[window] = print_values(updates[window], "window")
# Get the h_max_t and h_min_t
t = T.minimum(t + 1, window_width)
h_max_t = T.max(updates[window][:t])
h_min_t = T.min(updates[window][:t])
# Update the moving averages
h_max = theano.shared(value_floatX(0.0), name="h_max")
h_min = theano.shared(value_floatX(0.0), name="h_min")
updates[h_max] = ema(beta, h_max, h_max_t)
updates[h_min] = ema(beta, h_min, h_min_t)
return updates[h_max], updates[h_min]
def gradient_variance(grads, params, beta, updates):
"""
Routine for calculating the variance of the gradients.
"""
# Total variance
variance = 0
for param, grad in zip(params, grads):
# Make shared variables
mom1 = shared_mirror(param)
mom2 = shared_mirror(param)
# Update moving averages
updates[mom1] = ema(beta, mom1, grad)
updates[mom2] = ema(beta, mom2, T.sqr(grad))
# Update the total variance
variance += T.sum(T.abs_(updates[mom2] - T.sqr(updates[mom1])))
return variance
def distance_to_optim(grads, beta, updates):
"""
Routine fro calculating the distance to the optimum.
"""
# Had issue with initializing to 0.0, so switched to 1.0
g = theano.shared(value_floatX(1.0), name="g")
h = theano.shared(value_floatX(1.0), name="h")
d = theano.shared(value_floatX(1.0), name="d")
# L2 norm
l2_norm = sum(T.sum(T.sqr(g)) for g in grads)
updates[g] = ema(beta, g, T.sqrt(l2_norm))
updates[h] = ema(beta, h, l2_norm)
updates[d] = ema(beta, d, updates[g] / updates[h])
return updates[d]
def solve(c, d, h_min, debug=False):
# We have the equation x^2 D^2 + (1-x)^4 * C / h_min^2
# where x = sqrt(mu)
# Minimising this reduces to solving
# y^3 + p * y + p = 0
# y = x - 1
# p = (D^2 h_min^2)/(2 * C)
p = (T.sqr(d) * T.sqr(h_min)) / (2 * c)
w3 = p * (T.sqrt(0.25 + p / 27.0) - 0.5)
w = T.power(w3, 1.0 / 3.0)
y = w - p / (3 * w)
sqrt_mu = y + 1
if debug:
value = print_values(y*y*y + p*y + p, "derivative_value")
sqrt_mu += 1e-20 * value
return sqrt_mu
def momentum(grads, params, learning_rate, momentum, updates=None):
"""
Standard momentum - copied from Lasagne library.
"""
updates = OrderedDict() if updates is None else updates
velocities = [shared_mirror(p) for p in params]
for param, grad, v in zip(params, grads), velocities:
updates[v] = v * momentum - learning_rate * grad
updates[param] = param + updates[v]
return updates
def print_values(var, msg):
"""
Makes an op to print the values of the variable with the message
"""
return Print(msg)(var)
def ema(alpha, s_t, x_t):
"""
Exponential moving average
"""
return alpha * s_t + (1 - alpha) * x_t
def value_floatX(x):
"""
Converts the value to a numpy array of type theano.config.floatX
"""
return np.asarray(x).astype(theano.config.floatX)
def shared_mirror(shared):
"""
Creates a shared variable with same specs as the input.
"""
value = shared.get_value(borrow=True)
return theano.shared(np.zeros(value.shape, dtype=value.dtype),
broadcastable=shared.broadcastable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment