-
-
Save wuaalb/c5b85d0c257d44b0d98a to your computer and use it in GitHub Desktop.
Normalizing flow experiments
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Reproducing the experiment of section 6.1 "Representative Power of Normalizing Flows" from [1]_ (figure 3). | |
This experiment visually demonstrates that normalizing flows can successfully transform a simple | |
initial approximate distribution q_0(z) to much better approximate some known non-Gaussian bivariate | |
distribution p(z). | |
The known target distributions are specified using energy functions U(z) (table 1 from [1]_). | |
p(z) = \frac{1}{Z} e^{-U(z)}, where Z is the unknown partition function (normalization constant); | |
that is, p(z) \prop e^{-U(z)}. | |
Steps | |
----- | |
1. Generate samples from initial distribution z0 ~ q_0(z) = N(z; \mu, \sigma^2 I). | |
Here \mu and \sigma can either be fixed to something "reasonable", or estimated as follows. | |
Draw auxillary random variable \eps from standard normal distribution | |
\eps ~ N(0, I) | |
and apply linear normalizing flow transformation f(\eps) = \mu + \sigma \eps, reparameterizing | |
\sigma = e^{1/2*log_var} to ensure \sigma > 0, then jointly optimize {mu, log_var} together | |
with the other normalizing flow parameters (see below). | |
2. Transform the initial samples z_0 through K normalizing flow transforms, from which we obtain the | |
transformed approximate distribution q_K(z), | |
log q_K(z) = log q_0(z) - sum_{k=1}^K log det |J_k| | |
where J_k is the Jacobian of the k-th (invertible) normalizing flow transform. | |
E.g. for planar flows, | |
log q_K(z) = log q_0(z) - sum_{k=1}^K log |1 + u_k^T \psi_k(z_{k-1})| | |
where each flow includes model parameters \lambda = {w, u, b}. | |
3. Jointly optimize all model parameters by minimizing KL-divergence between the approxmate distribution q_K(z) | |
and the true distribution p(z). | |
loss = KL[q_K(z)||p(z)] = E_{z_K~q_K(z)} [log q_K(z_K) - log p(z_K)] | |
= E_{z_K~q_K(z)} [(log q_0(z_0) - sum_k log det |J_k|) - (-U(z_K) - log(Z))] | |
= E_{z_0~q_0(z)} [log q_0(z_0) - sum_k log det |J_k| + U(f_1(f_2(..f_K(z_0)))) + log(Z)] | |
Here the partition function Z is independent of z_0 and model parameters, so we can ignore it for the optimization | |
\frac{\partial}{\partial params} loss \prop \frac{\partial}{\partial params} E_{z_0~q_0(z)} [log q0(z0) - sum_k log det |J_k| + U(z_K)] | |
References | |
---------- | |
.. [1] Jimenez Rezende, D., Mohamed, S., "Variational Inference with Normalizing Flows", | |
Proceedings of the 32nd International Conference on Machine Learning, 2015. | |
""" | |
from __future__ import division | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import theano | |
import theano.tensor as T | |
import lasagne | |
from parmesan.distributions import log_stdnormal, log_normal2 | |
from parmesan.layers import NormalizingPlanarFlowLayer, ListIndexLayer | |
# TODO | |
# - why doesn't 3rd column plot work (should show qK(z) but instead shows q0(z))? not using Theano / Lasagne correctly? some mistake? | |
# - draw hyperplane w^T+b = 0 (line in this case) for mode='planar' | |
# - during training print E[log p(zK)], E[log qK(zK)] = E[q0(z0) - sum log dot J] | |
# - now everything uses shape (batch_size*iw_samples, ndim); would be nice to reshape to (batch_size, iw_samples, ndim) like in iw_vae_normflow.py, | |
# although in this case there's no practical difference in the result as we aren't using any actual data | |
# config | |
mode = 'linear' # 'linear' or 'planar' | |
#mode = 'planar' # 'linear' or 'planar' | |
norm_p_z = False # for mode='linear' only | |
use_annealed_loss = False # annealed free energy, see paper | |
nflows = 2 | |
batch_size = 100 | |
if mode == 'linear': | |
nparam_updates = 20000 | |
elif mode == 'planar': | |
nparam_updates = 100000#500000 | |
report_every = 1000 | |
nepochs = nparam_updates // report_every # total num. of 'epochs' | |
lr = 1.0e-5 | |
momentum = 0.9 | |
iw_samples = 1 # number of samples for Monte Carlo approximation of expectation over q0(z) | |
# note: because we aren't using any actual data in this case, this has the same effect as multiplying batch_size | |
class NormalizingLinearFlowLayer(lasagne.layers.Layer): | |
""" | |
Normalizing flow layer with transform f(z) = mu + sigma*z. | |
Ensures constraint sigma > 0, by reparameterizing it as log_var. | |
This flow transformation is not very powerful as a general transformation, | |
it is mainly intended for testing purposes. | |
""" | |
def __init__(self, incoming, | |
mu=lasagne.init.Normal(), | |
log_var=lasagne.init.Normal(), **kwargs): | |
super(NormalizingLinearFlowLayer, self).__init__(incoming, **kwargs) | |
ndim = int(np.prod(self.input_shape[1:])) | |
self.mu = self.add_param(mu, (ndim,), name="mu") | |
self.log_var = self.add_param(log_var, (ndim,), name="log_var") | |
def get_output_shape_for(self, input_shape): | |
return input_shape | |
def get_output_for(self, input, **kwargs): | |
z = input | |
sigma = T.exp(0.5*self.log_var) | |
f_z = self.mu + sigma*z | |
logdet_jacobian = T.sum(0.5*self.log_var) | |
return [f_z, logdet_jacobian] | |
def U_z(z): | |
"""Test energy function U(z).""" | |
if mode == 'linear': | |
import math | |
if norm_p_z: | |
# normalized; in this case KL divergence should be >= 0, | |
# at least if batch_size*iw_samples is sufficiently large to have a good Monte Carlo approximation of expectation | |
c = - 0.5 * math.log(2*math.pi) | |
else: | |
# not normalized; in this case KL divergence may be negative (or never reach a value close to zero) | |
c = 0 | |
mu = np.array([0.7, -1.5]).astype(theano.config.floatX) | |
log_var = np.log(np.array([2.3, 0.3])).astype(theano.config.floatX) | |
log_p = c - log_var/2 - (z - mu)**2 / (2*T.exp(log_var)) | |
return -T.sum(log_p, axis=1) | |
elif mode == 'planar': | |
z1 = z[:, 0] | |
z2 = z[:, 1] | |
return 0.5*((T.sqrt(z1**2 + z2**2) - 2)/0.4)**2 - T.log(T.exp(-0.5*((z1 - 2)/0.6)**2) + T.exp(-0.5*((z1 + 2)/0.6)**2)) | |
def evaluate_bivariate_pdf(p_z, range, npoints, z_sym=None): | |
"""Evaluate (possibly unnormalized) pdf over a meshgrid.""" | |
side = np.linspace(range[0], range[1], npoints) | |
z1, z2 = np.meshgrid(side, side) | |
z = np.hstack([z1.reshape(-1, 1), z2.reshape(-1, 1)]) | |
if not z_sym: | |
z_sym = T.matrix('z') | |
p_z_ = p_z(z_sym) | |
else: | |
p_z_ = p_z | |
p_z_fun = theano.function([z_sym], [p_z_], allow_input_downcast=True) | |
return z1, z2, p_z_fun(z)[0].reshape((npoints, npoints)) | |
def evaluate_bivariate_pdf_no_comp(p_z_fun, range, npoints): | |
side = np.linspace(range[0], range[1], npoints) | |
z1, z2 = np.meshgrid(side, side) | |
z = np.hstack([z1.reshape(-1, 1), z2.reshape(-1, 1)]) | |
return z1, z2, p_z_fun(z)[0].reshape((npoints, npoints)) | |
# main script | |
np.random.seed(1234) | |
z0 = T.matrix('z0') | |
l_in = lasagne.layers.InputLayer((None, 2), input_var=z0) | |
if mode == 'linear': | |
l_nf = NormalizingLinearFlowLayer(l_in, name='NF') | |
l_zk = ListIndexLayer(l_nf, index=0) | |
l_logdet_J_list = [ListIndexLayer(l_nf, index=1)] | |
elif mode == 'planar': | |
l_logdet_J_list = [] | |
l_zk = l_in | |
# -> | |
# also learn initial mapping form standard gaussian to gaussian with mean and diagonal covariance | |
#l_nf = NormalizingLinearFlowLayer(l_zk, name='NF_0') | |
#l_zk = ListIndexLayer(l_nf, index=0) | |
#l_logdet_J_list += [ListIndexLayer(l_nf, index=1)] | |
# <- | |
for k in xrange(nflows): | |
l_nf = NormalizingPlanarFlowLayer(l_zk, name='NF_{:d}'.format(1+k)) | |
l_zk = ListIndexLayer(l_nf, index=0) | |
l_logdet_J_list += [ListIndexLayer(l_nf, index=1)] | |
train_out = lasagne.layers.get_output([l_zk] + l_logdet_J_list, z0, deterministic=False) | |
zK = train_out[0] | |
logdet_J_list = train_out[1:] | |
# loss function | |
log_q0_z0 = log_stdnormal(z0).sum(axis=1) | |
sum_logdet_J = 0 | |
for logdet_J_k in logdet_J_list: | |
sum_logdet_J += logdet_J_k | |
log_qK_zK = log_q0_z0 - sum_logdet_J | |
if use_annealed_loss: | |
iteration = theano.shared(0) | |
beta_t = T.minimum(1, 0.01 + iteration/10000) # inverse temperature that goes from 0.01 to 1 after 10000 iterations | |
# XXX: an 'iteration' is a parameter update, right? | |
kl = T.mean(log_qK_zK + beta_t*U_z(zK), axis=0) | |
else: | |
kl = T.mean(log_qK_zK + U_z(zK), axis=0) | |
loss = kl | |
# updates | |
params = lasagne.layers.get_all_params([l_zk], trainable=True) | |
print('Trainable parameters:') | |
for p in params: | |
print(' {}: {}'.format(p, p.get_value().shape if p.get_value().shape != () else 'scalar')) | |
grads = T.grad(loss, params) | |
updates_rmsprop = lasagne.updates.rmsprop(grads, params, learning_rate=lr) | |
updates = lasagne.updates.apply_momentum(updates_rmsprop, params, momentum=momentum) | |
#updates = lasagne.updates.adam(grads, params, learning_rate=0.0001) | |
if use_annealed_loss: | |
updates[iteration] = iteration + 1 | |
# compile | |
print('Compiling...') | |
train_model = theano.function([z0], [loss], updates=updates, allow_input_downcast=True) | |
# train | |
print('Training...') | |
losses = [] | |
epoch = 0 | |
assert nparam_updates % report_every == 0, 'should be integer multiple' | |
for k in xrange(nparam_updates): | |
z0_ = np.random.normal(size=(batch_size*iw_samples, 2)) | |
loss = train_model(z0_)[0] | |
losses += [loss] | |
if (1+k) % report_every == 0: | |
print('{:d}/{:d}: loss = {:.6f}'.format(1+epoch, nepochs, np.mean(losses))) | |
if any(np.isnan(losses)): | |
print('NaN loss, aborting...') | |
break | |
for p in params: | |
print(' {}:\t{}'.format(p, p.get_value())) | |
losses = [] | |
epoch += 1 | |
# plot | |
fig = plt.figure() | |
ax = plt.subplot(1, 4, 1, aspect='equal') | |
z1, z2, phat_z = evaluate_bivariate_pdf(lambda z: T.exp(-U_z(z)), range=(-4, 4), npoints=200) | |
plt.pcolormesh(z1, z2, phat_z) | |
ax.get_xaxis().set_ticks([]) | |
ax.get_yaxis().set_ticks([]) | |
ax.invert_yaxis() | |
ax.set_title('$p(z)$') | |
ax = plt.subplot(1, 4, 2, aspect='equal') | |
z1, z2, q0_z0 = evaluate_bivariate_pdf(T.exp(log_q0_z0), range=(-4, 4), npoints=200, z_sym=z0) | |
plt.pcolormesh(z1, z2, q0_z0) | |
ax.get_xaxis().set_ticks([]) | |
ax.get_yaxis().set_ticks([]) | |
ax.invert_yaxis() | |
ax.set_title('$q_0(z)$') | |
ax = plt.subplot(1, 4, 3, aspect='equal') | |
# get transformed distribution qK(z) directly | |
# XXX: doesn't work (ie. shows the un-transformed distribution q0_z0 instead) | |
qK_zK = T.exp(log_qK_zK) | |
print('\nComputational graph before optimization:') | |
theano.printing.debugprint(qK_zK) | |
print('') | |
eval_model1 = theano.function([z0], [qK_zK], allow_input_downcast=True) | |
print('\nComputational graph after optimization:') | |
theano.printing.debugprint(eval_model1.maker.fgraph.outputs[0]) | |
print('') | |
z1, z2, qK_zK_ = evaluate_bivariate_pdf_no_comp(eval_model1, range=(-4, 4), npoints=200) | |
ax.pcolormesh(z1, z2, qK_zK_) | |
ax.get_xaxis().set_ticks([]) | |
ax.get_yaxis().set_ticks([]) | |
ax.invert_yaxis() | |
ax.set_title('$q_K(z)$') | |
ax = plt.subplot(1, 4, 4, aspect='equal') | |
# get samples zK ~ qK(z) and plot those | |
# XXX: this DOES work | |
eval_model2 = theano.function([z0], [zK], allow_input_downcast=True) | |
N = 1000000 # take many samples; but will still look a little 'spotty' | |
z0_ = np.random.normal(size=(N, 2)) | |
zK_ = eval_model2(z0_)[0] | |
ax.hist2d(zK_[:, 0], zK_[:, 1], range=[[-4, 4], [-4, 4]], bins=200) | |
ax.get_xaxis().set_ticks([]) | |
ax.get_yaxis().set_ticks([]) | |
ax.invert_yaxis() | |
ax.set_title('$z_K \sim q_K(z)$') | |
fig.tight_layout() | |
plt.show() | |
print('Done :)') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment