Skip to content

Instantly share code, notes, and snippets.

@wuaalb

wuaalb/nf6_1.py Secret

Last active October 14, 2020 12:04
Show Gist options
  • Save wuaalb/c5b85d0c257d44b0d98a to your computer and use it in GitHub Desktop.
Save wuaalb/c5b85d0c257d44b0d98a to your computer and use it in GitHub Desktop.
Normalizing flow experiments
"""
Reproducing the experiment of section 6.1 "Representative Power of Normalizing Flows" from [1]_ (figure 3).
This experiment visually demonstrates that normalizing flows can successfully transform a simple
initial approximate distribution q_0(z) to much better approximate some known non-Gaussian bivariate
distribution p(z).
The known target distributions are specified using energy functions U(z) (table 1 from [1]_).
p(z) = \frac{1}{Z} e^{-U(z)}, where Z is the unknown partition function (normalization constant);
that is, p(z) \prop e^{-U(z)}.
Steps
-----
1. Generate samples from initial distribution z0 ~ q_0(z) = N(z; \mu, \sigma^2 I).
Here \mu and \sigma can either be fixed to something "reasonable", or estimated as follows.
Draw auxillary random variable \eps from standard normal distribution
\eps ~ N(0, I)
and apply linear normalizing flow transformation f(\eps) = \mu + \sigma \eps, reparameterizing
\sigma = e^{1/2*log_var} to ensure \sigma > 0, then jointly optimize {mu, log_var} together
with the other normalizing flow parameters (see below).
2. Transform the initial samples z_0 through K normalizing flow transforms, from which we obtain the
transformed approximate distribution q_K(z),
log q_K(z) = log q_0(z) - sum_{k=1}^K log det |J_k|
where J_k is the Jacobian of the k-th (invertible) normalizing flow transform.
E.g. for planar flows,
log q_K(z) = log q_0(z) - sum_{k=1}^K log |1 + u_k^T \psi_k(z_{k-1})|
where each flow includes model parameters \lambda = {w, u, b}.
3. Jointly optimize all model parameters by minimizing KL-divergence between the approxmate distribution q_K(z)
and the true distribution p(z).
loss = KL[q_K(z)||p(z)] = E_{z_K~q_K(z)} [log q_K(z_K) - log p(z_K)]
= E_{z_K~q_K(z)} [(log q_0(z_0) - sum_k log det |J_k|) - (-U(z_K) - log(Z))]
= E_{z_0~q_0(z)} [log q_0(z_0) - sum_k log det |J_k| + U(f_1(f_2(..f_K(z_0)))) + log(Z)]
Here the partition function Z is independent of z_0 and model parameters, so we can ignore it for the optimization
\frac{\partial}{\partial params} loss \prop \frac{\partial}{\partial params} E_{z_0~q_0(z)} [log q0(z0) - sum_k log det |J_k| + U(z_K)]
References
----------
.. [1] Jimenez Rezende, D., Mohamed, S., "Variational Inference with Normalizing Flows",
Proceedings of the 32nd International Conference on Machine Learning, 2015.
"""
from __future__ import division
import numpy as np
import matplotlib.pyplot as plt
import theano
import theano.tensor as T
import lasagne
from parmesan.distributions import log_stdnormal, log_normal2
from parmesan.layers import NormalizingPlanarFlowLayer, ListIndexLayer
# TODO
# - why doesn't 3rd column plot work (should show qK(z) but instead shows q0(z))? not using Theano / Lasagne correctly? some mistake?
# - draw hyperplane w^T+b = 0 (line in this case) for mode='planar'
# - during training print E[log p(zK)], E[log qK(zK)] = E[q0(z0) - sum log dot J]
# - now everything uses shape (batch_size*iw_samples, ndim); would be nice to reshape to (batch_size, iw_samples, ndim) like in iw_vae_normflow.py,
# although in this case there's no practical difference in the result as we aren't using any actual data
# config
mode = 'linear' # 'linear' or 'planar'
#mode = 'planar' # 'linear' or 'planar'
norm_p_z = False # for mode='linear' only
use_annealed_loss = False # annealed free energy, see paper
nflows = 2
batch_size = 100
if mode == 'linear':
nparam_updates = 20000
elif mode == 'planar':
nparam_updates = 100000#500000
report_every = 1000
nepochs = nparam_updates // report_every # total num. of 'epochs'
lr = 1.0e-5
momentum = 0.9
iw_samples = 1 # number of samples for Monte Carlo approximation of expectation over q0(z)
# note: because we aren't using any actual data in this case, this has the same effect as multiplying batch_size
class NormalizingLinearFlowLayer(lasagne.layers.Layer):
"""
Normalizing flow layer with transform f(z) = mu + sigma*z.
Ensures constraint sigma > 0, by reparameterizing it as log_var.
This flow transformation is not very powerful as a general transformation,
it is mainly intended for testing purposes.
"""
def __init__(self, incoming,
mu=lasagne.init.Normal(),
log_var=lasagne.init.Normal(), **kwargs):
super(NormalizingLinearFlowLayer, self).__init__(incoming, **kwargs)
ndim = int(np.prod(self.input_shape[1:]))
self.mu = self.add_param(mu, (ndim,), name="mu")
self.log_var = self.add_param(log_var, (ndim,), name="log_var")
def get_output_shape_for(self, input_shape):
return input_shape
def get_output_for(self, input, **kwargs):
z = input
sigma = T.exp(0.5*self.log_var)
f_z = self.mu + sigma*z
logdet_jacobian = T.sum(0.5*self.log_var)
return [f_z, logdet_jacobian]
def U_z(z):
"""Test energy function U(z)."""
if mode == 'linear':
import math
if norm_p_z:
# normalized; in this case KL divergence should be >= 0,
# at least if batch_size*iw_samples is sufficiently large to have a good Monte Carlo approximation of expectation
c = - 0.5 * math.log(2*math.pi)
else:
# not normalized; in this case KL divergence may be negative (or never reach a value close to zero)
c = 0
mu = np.array([0.7, -1.5]).astype(theano.config.floatX)
log_var = np.log(np.array([2.3, 0.3])).astype(theano.config.floatX)
log_p = c - log_var/2 - (z - mu)**2 / (2*T.exp(log_var))
return -T.sum(log_p, axis=1)
elif mode == 'planar':
z1 = z[:, 0]
z2 = z[:, 1]
return 0.5*((T.sqrt(z1**2 + z2**2) - 2)/0.4)**2 - T.log(T.exp(-0.5*((z1 - 2)/0.6)**2) + T.exp(-0.5*((z1 + 2)/0.6)**2))
def evaluate_bivariate_pdf(p_z, range, npoints, z_sym=None):
"""Evaluate (possibly unnormalized) pdf over a meshgrid."""
side = np.linspace(range[0], range[1], npoints)
z1, z2 = np.meshgrid(side, side)
z = np.hstack([z1.reshape(-1, 1), z2.reshape(-1, 1)])
if not z_sym:
z_sym = T.matrix('z')
p_z_ = p_z(z_sym)
else:
p_z_ = p_z
p_z_fun = theano.function([z_sym], [p_z_], allow_input_downcast=True)
return z1, z2, p_z_fun(z)[0].reshape((npoints, npoints))
def evaluate_bivariate_pdf_no_comp(p_z_fun, range, npoints):
side = np.linspace(range[0], range[1], npoints)
z1, z2 = np.meshgrid(side, side)
z = np.hstack([z1.reshape(-1, 1), z2.reshape(-1, 1)])
return z1, z2, p_z_fun(z)[0].reshape((npoints, npoints))
# main script
np.random.seed(1234)
z0 = T.matrix('z0')
l_in = lasagne.layers.InputLayer((None, 2), input_var=z0)
if mode == 'linear':
l_nf = NormalizingLinearFlowLayer(l_in, name='NF')
l_zk = ListIndexLayer(l_nf, index=0)
l_logdet_J_list = [ListIndexLayer(l_nf, index=1)]
elif mode == 'planar':
l_logdet_J_list = []
l_zk = l_in
# ->
# also learn initial mapping form standard gaussian to gaussian with mean and diagonal covariance
#l_nf = NormalizingLinearFlowLayer(l_zk, name='NF_0')
#l_zk = ListIndexLayer(l_nf, index=0)
#l_logdet_J_list += [ListIndexLayer(l_nf, index=1)]
# <-
for k in xrange(nflows):
l_nf = NormalizingPlanarFlowLayer(l_zk, name='NF_{:d}'.format(1+k))
l_zk = ListIndexLayer(l_nf, index=0)
l_logdet_J_list += [ListIndexLayer(l_nf, index=1)]
train_out = lasagne.layers.get_output([l_zk] + l_logdet_J_list, z0, deterministic=False)
zK = train_out[0]
logdet_J_list = train_out[1:]
# loss function
log_q0_z0 = log_stdnormal(z0).sum(axis=1)
sum_logdet_J = 0
for logdet_J_k in logdet_J_list:
sum_logdet_J += logdet_J_k
log_qK_zK = log_q0_z0 - sum_logdet_J
if use_annealed_loss:
iteration = theano.shared(0)
beta_t = T.minimum(1, 0.01 + iteration/10000) # inverse temperature that goes from 0.01 to 1 after 10000 iterations
# XXX: an 'iteration' is a parameter update, right?
kl = T.mean(log_qK_zK + beta_t*U_z(zK), axis=0)
else:
kl = T.mean(log_qK_zK + U_z(zK), axis=0)
loss = kl
# updates
params = lasagne.layers.get_all_params([l_zk], trainable=True)
print('Trainable parameters:')
for p in params:
print(' {}: {}'.format(p, p.get_value().shape if p.get_value().shape != () else 'scalar'))
grads = T.grad(loss, params)
updates_rmsprop = lasagne.updates.rmsprop(grads, params, learning_rate=lr)
updates = lasagne.updates.apply_momentum(updates_rmsprop, params, momentum=momentum)
#updates = lasagne.updates.adam(grads, params, learning_rate=0.0001)
if use_annealed_loss:
updates[iteration] = iteration + 1
# compile
print('Compiling...')
train_model = theano.function([z0], [loss], updates=updates, allow_input_downcast=True)
# train
print('Training...')
losses = []
epoch = 0
assert nparam_updates % report_every == 0, 'should be integer multiple'
for k in xrange(nparam_updates):
z0_ = np.random.normal(size=(batch_size*iw_samples, 2))
loss = train_model(z0_)[0]
losses += [loss]
if (1+k) % report_every == 0:
print('{:d}/{:d}: loss = {:.6f}'.format(1+epoch, nepochs, np.mean(losses)))
if any(np.isnan(losses)):
print('NaN loss, aborting...')
break
for p in params:
print(' {}:\t{}'.format(p, p.get_value()))
losses = []
epoch += 1
# plot
fig = plt.figure()
ax = plt.subplot(1, 4, 1, aspect='equal')
z1, z2, phat_z = evaluate_bivariate_pdf(lambda z: T.exp(-U_z(z)), range=(-4, 4), npoints=200)
plt.pcolormesh(z1, z2, phat_z)
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
ax.invert_yaxis()
ax.set_title('$p(z)$')
ax = plt.subplot(1, 4, 2, aspect='equal')
z1, z2, q0_z0 = evaluate_bivariate_pdf(T.exp(log_q0_z0), range=(-4, 4), npoints=200, z_sym=z0)
plt.pcolormesh(z1, z2, q0_z0)
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
ax.invert_yaxis()
ax.set_title('$q_0(z)$')
ax = plt.subplot(1, 4, 3, aspect='equal')
# get transformed distribution qK(z) directly
# XXX: doesn't work (ie. shows the un-transformed distribution q0_z0 instead)
qK_zK = T.exp(log_qK_zK)
print('\nComputational graph before optimization:')
theano.printing.debugprint(qK_zK)
print('')
eval_model1 = theano.function([z0], [qK_zK], allow_input_downcast=True)
print('\nComputational graph after optimization:')
theano.printing.debugprint(eval_model1.maker.fgraph.outputs[0])
print('')
z1, z2, qK_zK_ = evaluate_bivariate_pdf_no_comp(eval_model1, range=(-4, 4), npoints=200)
ax.pcolormesh(z1, z2, qK_zK_)
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
ax.invert_yaxis()
ax.set_title('$q_K(z)$')
ax = plt.subplot(1, 4, 4, aspect='equal')
# get samples zK ~ qK(z) and plot those
# XXX: this DOES work
eval_model2 = theano.function([z0], [zK], allow_input_downcast=True)
N = 1000000 # take many samples; but will still look a little 'spotty'
z0_ = np.random.normal(size=(N, 2))
zK_ = eval_model2(z0_)[0]
ax.hist2d(zK_[:, 0], zK_[:, 1], range=[[-4, 4], [-4, 4]], bins=200)
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
ax.invert_yaxis()
ax.set_title('$z_K \sim q_K(z)$')
fig.tight_layout()
plt.show()
print('Done :)')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment