Skip to content

Instantly share code, notes, and snippets.

@john-bradshaw
Last active October 17, 2017 21:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save john-bradshaw/7acd30bc14ea0885df86a6c1f6043b32 to your computer and use it in GitHub Desktop.
Save john-bradshaw/7acd30bc14ea0885df86a6c1f6043b32 to your computer and use it in GitHub Desktop.
Mixture of Gaussians variational morphing similar to Fig 1c of Saul, L.K. and Jordan, M.I., 1997
"""
Code for Figure 1c of
Saul, L.K. and Jordan, M.I., 1997. A variational principle for model-based morphing.
In Advances in Neural Information Processing Systems (pp. 267-273).
Note not exactly the same setup and do not know settings of ell and variance to use
Also not sure how to deal with the change points
Written in a bit of a rush and not thoroughly tested so use with caution
"""
import numpy as np
import matplotlib.pyplot as plt
from scipy import linalg as sla
from scipy.integrate import odeint
from scipy.optimize import fsolve
NUM_DIM = 2
#TODO: if using different probabilities for clusters then shoudl adjust ell
colors = ['#1f77b4',
'#ff7f0e',
'#2ca02c',
'#d62728',
'#9467bd',
'#8c564b',
'#e377c2',
'#7f7f7f',
'#bcbd22',
'#17becf']
def func(x_concat, t, precision, ell, mean):
# ODE defined in equation 9
x = x_concat[:NUM_DIM][:, None] - mean[:, None]
v = x_concat[NUM_DIM:][:, None]
M = precision
a_coeff = 0.5 * (x.T @ M @ x) + ell
new_a = (x - v * (x.T @ M @ v)) / a_coeff
derivs = np.vstack((v, new_a))
return np.squeeze(derivs)
class Gaussian:
def __init__(self, mean, precision):
self.mean = mean
self.precision = precision
self._det_prec = sla.det(precision)
self._ndim = mean.size
#todo: pdf and log pdf does stuff very inefficiently if doing more than one. Move to using scipy stats.
def pdf(self, x_points):
if len(x_points.shape) == 1:
x_points = x_points[:, None]
mean = self.mean[:, None]
first_factor = self._det_prec ** 0.5 / ((2*np.pi) ** ( self._ndim/2.))
second_factor = np.exp(-0.5 * (x_points - mean).T @ self.precision @ (x_points - mean))
else:
mean = self.mean[None, :]
first_factor = self._det_prec ** 0.5 / ((2 * np.pi) ** (self._ndim / 2.))
second_factor = np.diagonal(np.exp(-0.5 * (x_points - mean) @ self.precision @ (x_points - mean).T))[:, None]
return first_factor * second_factor
def log_pdf(self, x_points):
if len(x_points.shape) == 1:
x_points = x_points[:, None]
mean = self.mean[:, None]
third_comp = -0.5 * (x_points - mean).T @ self.precision @ (x_points - mean)
else:
mean = self.mean[None, :]
third_comp = np.diagonal(-0.5 * (x_points - mean) @ self.precision @ (x_points - mean).T)[:, None]
first_comp = 0.5 * np.log(self._det_prec)
second_comp = - self._ndim/2. * np.log(2 * np.pi)
return first_comp + second_comp + third_comp
class PathOptimizer:
def __init__(self, x_0, x_1, mog):
self.x_0 = x_0
self.x_1 = x_1
self.mog = mog
self.ell = 10
self.q_t = _interp_2d(x_0, x_1, 20)
self.z_t = self.solve_for_zt()
self.max_iter = 75
def solve_for_zt(self):
"""
assigns the cluster associations for each time step based on Baye's rule
"""
zt = []
for i in range(self.q_t.shape[0]):
x = self.q_t[i, :]
ll = [g.log_pdf(x) for g in self.mog]
cluster_assoc = np.argmax(ll)
zt.append(cluster_assoc)
return np.array(zt)
def solve_for_qt(self):
new_qts = []
def recurse_next(new_qts, current_qts, current_zts, x_start, mog_id_start):
if len(current_qts) == 0:
return # recursed down to bottom
for t, zt in enumerate(current_zts):
if zt != mog_id_start:
if t ==0:
# path was only one point so skip to next section
recurse_next(new_qts, current_qts[t+1:], current_zts[t+1:], x_start, zt)
return
break
else:
x_end = current_qts[t]
path = self._solve_qt_around_one_gaussian(x_start, x_end, self.mog[mog_id_start])
# Next time in this function we move onto the next section
next_qts = current_qts[t+1:]
next_zts = current_zts[t+1:]
# decided to not take whole path and actually stop one before. Find this converges better
# and if do not do this then the change points stay fairly static. Not sure whether this
# is the best way to deal with these change points, but it seems to give results similar
# to the paper.
if len(next_qts) == 0:
new_qts.append(path[:, :NUM_DIM])
else:
new_qts.append(path[:-1, :NUM_DIM])
x_end = path[-2, :NUM_DIM]
recurse_next(new_qts, next_qts, next_zts, x_end, zt)
recurse_next(new_qts, self.q_t, self.z_t, self.x_0, self.z_t[0])
new_qts_array = np.concatenate(new_qts, axis=0)
return new_qts_array
def _solve_qt_around_one_gaussian(self, x_start, x_end, gaussian):
# we solve a path for one of the MOG factors by using shooting method
precision = gaussian.precision
mean = gaussian.mean
tspan = np.linspace(0, 10., 50)
def solutions_func(v_start_guess):
U = odeint(func, np.concatenate((x_start, v_start_guess)), tspan, args=(precision, self.ell, mean))
u1 = U[:, :NUM_DIM]
return u1[-1] - x_end
vstart = fsolve(solutions_func, x_end)
x0 = np.array(np.concatenate((x_start, vstart)), dtype=np.float32)
sol = odeint(func, x0, tspan, args=(precision, self.ell, mean))
return sol
def iterate_and_solve_via_paper_approach(self):
qts = [self.q_t]
zts = [self.z_t]
for i in range(self.max_iter):
# First solve for new path route.
new_qts = self.solve_for_qt()
self.q_t = new_qts
qts.append(self.q_t)
# and then solve for which cluster each point on route belongs to.
new_zts = self.solve_for_zt()
self.z_t = new_zts
zts.append(new_zts)
print("Iteration {} finished".format(i))
return qts, zts
def _create_isotropic_precision(var):
return 1/var * np.eye(NUM_DIM, dtype=float)
def _interp_2d(x_0, x_1, num):
interps = np.array([np.linspace(a,b, num) for a,b in zip(x_0, x_1)]).T
return interps
def plot(qts, zts, mog):
X, Y = np.meshgrid(np.linspace(-6, 6, 50), np.linspace(2, 8, 50))
orig_shape = X.shape
for j, (path_qt, path_zt) in enumerate(zip(qts, zts)):
fig, ax = plt.subplots(figsize=(7, 9))
for i, gaussian in enumerate(mog):
z = gaussian.pdf(np.concatenate((X.reshape(-1, 1), Y.reshape(-1, 1)), axis=1))
Z = z.reshape(*orig_shape)
c = colors[i]
ax.contour(X, Y, Z, colors=c, alpha=0.5)
ax.plot(path_qt[path_zt == i, 0], path_qt[path_zt == i, 1], 'x', color=c)
ax.plot(gaussian.mean[0], gaussian.mean[1], "*" , color=c, ms=10)
ax.plot(path_qt[0, 0], path_qt[0, 1], 'o', color='k', ms=10)
ax.plot(path_qt[-1, 0], path_qt[-1, 1], 'o', color='k', ms=10)
ax.set_title("Iteration {}".format(j))
if j % 5 == 0 or j < 10:
plt.savefig("iter_{}.png".format(j))
def main():
mog = [
Gaussian(np.array([-6., 3.5], dtype=float), _create_isotropic_precision(1.)),
Gaussian(np.array([-5., 6.2], dtype=float), _create_isotropic_precision(1.)),
Gaussian(np.array([-2.5, 8], dtype=float), _create_isotropic_precision(1.)),
Gaussian(np.array([2.5, 8], dtype=float), _create_isotropic_precision(1.)),
Gaussian(np.array([5., 6.2], dtype=float), _create_isotropic_precision(1.)),
]
x_0 = np.array([-6, 4.5], dtype=float)
x_1 = np.array([6., 4.5], dtype=float)
path_optimizer = PathOptimizer(x_0, x_1, mog)
qts, zts = path_optimizer.iterate_and_solve_via_paper_approach()
plot(qts, zts,mog)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment