Skip to content

Instantly share code, notes, and snippets.

@wohlert
Created June 22, 2018 13:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wohlert/07f6c36ac275fa19af409bf757e5d190 to your computer and use it in GitHub Desktop.
Save wohlert/07f6c36ac275fa19af409bf757e5d190 to your computer and use it in GitHub Desktop.
Hamiltonian Monte Carlo
import numpy as np
class HamiltonianMC(object):
"""
Hamiltonian/hybrid Monte Carlo
Uses Hamiltonian dynamics to make proposals using
kinetic energy of the proposal. Employs leapfrog
integration for time-reversibility and to ensure
detailed balance.
"""
def __init__(self, energy, integrator):
self.energy = energy
self.proposal = integrator
def step(self, x0, chain_length, return_trace=True):
"""
Perform `chain_length` steps of HMC sampling.
x0: initial state
chain_length: number of samples to take
return_trace: whether to return a trace containing
information about the run, or to just return samples.
The trace contains (samples, acceptance rate, trajectories).
"""
d, k = len(x0), self.proposal.steps
samples = np.tile(x0, (chain_length+1, 1))
trajectories = np.zeros(((chain_length+1)*k, d))
acceptance_rate = 0
for i in tqdm(range(1, chain_length+1)):
# Sample a random momentum
x0 = samples[i-1]
m0 = np.random.multivariate_normal(np.zeros(d), np.eye(d))
# Save variables for comparison
x = x0.copy()
m = m0.copy()
# Send x on a trip through the landscape
trajectories[(i*k):(i+1)*k] = self.proposal(x, m)
mass = 1
# Hamiltonian = kinetic + potential (total energy)
hamiltonian_x = self.energy(x) + np.dot(x, x)/(2*mass)
hamiltonian_x0 = self.energy(x0) + np.dot(x0, x0)/(2*mass)
if np.random.rand() < min(np.exp(hamiltonian_x0 - hamiltonian_x), 1):
acceptance_rate += 1
else:
x = x0
samples[i] = x
if return_trace:
return {"samples": samples, "acceptance_rate": acceptance_rate / chain_length,
"trajectories": trajectories.reshape(-1, k, d)}
return samples
class LeapfrogIntegrator(object):
def __init__(self, gradient, steps=10, epsilon=0.1):
self.gradient = gradient
self.steps = steps
self.epsilon = epsilon
def __call__(self, position, momentum):
""" Alters variables inplace! """
trajectories = np.tile(position, (self.steps, 1))
for i in range(1, self.steps):
momentum -= 0.5 * self.epsilon * self.gradient(position)
position += self.epsilon * momentum
momentum -= 0.5 * self.epsilon * self.gradient(position)
trajectories[i] = position
return trajectories
from leapfrog import LeapfrogIntegrator
from hmc import HamiltonianMC
class Gaussian(object):
"""
We treat this Gaussian distribution as our
unnormalised posterior distribution.
"""
def __init__(self, mu, sigma):
self.mu = mu
self.sigma = sigma
def energy(self, x):
logpdf = -0.5 * (np.log(2*pi) + np.log(self.sigma**2) + 1/self.sigma**2 * (x - self.mu)**2)
return -logpdf
def gradient(self, x):
return -(x - self.mu)/self.sigma**2
model = Gaussian(5, 3)
integrator = LeapfrogIntegrator(model.gradient, steps=25, epsilon=0.01)
sampler = HamiltonianMC(model.energy, integrator)
# Draw 1000 samples from the posterior distribution
x0 = np.random.normal(0, 1, 2)
samples = sampler.step(x0, 1000, return_trace=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment