Skip to content

Instantly share code, notes, and snippets.

@wgurecky
Last active October 2, 2018 23:02
Show Gist options
  • Save wgurecky/afac9a9de09bc1ecdb3d33f53fe9a60c to your computer and use it in GitHub Desktop.
Save wgurecky/afac9a9de09bc1ecdb3d33f53fe9a60c to your computer and use it in GitHub Desktop.
Metropolis MCMC example
import corner
import matplotlib.pyplot as pl
from matplotlib.ticker import MaxNLocator
def plot_mcmc_params(samples, labels, savefig='corner_plot.png', truths=None):
fig = corner.corner(samples, labels=labels,
truths=truths, use_math_text=True)
fig.savefig(savefig)
def plot_mcmc_chain(samples, labels, savefig, truths):
pl.clf()
# count number of cols in samples
n_params = samples.shape[1]
fig, axes = pl.subplots(n_params, 1, sharex=True, figsize=(8, 9), squeeze=False)
for i in range(n_params):
axes[i, 0].plot(samples[:, i].T, color="k", alpha=0.6)
axes[i, 0].yaxis.set_major_locator(MaxNLocator(5))
axes[i, 0].axhline(truths[i], color="#888888", lw=2)
axes[i, 0].set_ylabel(labels[i])
fig.tight_layout(h_pad=0.0)
fig.savefig(savefig)
###
# author: William Gurecky
# license: MIT
# notes: Implementation of MCMC algos for educational purposes.
###
from __future__ import print_function, division
from six import iteritems
import numpy as np
import scipy.stats as stats
import mc_plot
from copy import deepcopy
class McmcProposal(object):
def __init__(self):
pass
def update_proposal_cov(self):
raise NotImplementedError
def sample_proposal(self, n_samples=1):
raise NotImplementedError
def prob_ratio(self):
raise NotImplementedError
class DrChain(object):
def __init__(self, ln_like_fn, **kwargs):
self.current_depth = 1
self.dr_sample_chain = []
self.dr_prop_chain = []
self.max_depth = kwargs.get("max_depth", 5)
self.ln_like_fn = ln_like_fn
def run_dr_chain(self, dr_seed_chain, prop_base_cov):
"""!
Run the delayed rejection (DR) chain. break
early if we happen to accept a sample.
@prop_base_cov covarience of the base MCMC proposal
"""
assert len(dr_seed_chain) == 2
#param lambda_0 original location of MCMC chain before DR
#param beta_0 proposed, but rejected sample from original MCMC chain before DR
lambda_0 = dr_seed_chain[0]
beta_1 = dr_seed_chain[1]
self.dr_sample_chain = deepcopy(dr_seed_chain)
for i in range(self.max_depth):
depth = i + 1
# get a proposal density distribution based on all previous
# samples in the dr chain.
self.dr_prop_chain.append(DrGaussianProposal(self.ln_like_fn,
prop_base_cov))
new_prop_dist = self.dr_prop_chain[-1]._wrapped_mv_gausasin(dr_sample_chain)
# sample the proposal density distribution
pass
class DrGaussianProposal(McmcProposal):
"""!
@brief Delayed rejection gaussian proposal chain
with proposal shrinkage. The proposal density of a delayed chain
is updated with knowlege of other past proposed steps in the DR
chain. The simplest solution is to adjust the mean to the average
of all past attempted steps in the DR chain and adopt a covariance
which shinks as one tries more and more steps in the DR chain.
"""
def __init__(self, ln_like_fn, base_cov):
self.ln_like_fn = ln_like_fn
self.depth = 1
self.cov_base = base_cov
super(DrGaussianProposal, self).__init__()
def multi_stage_proposal_dist(self, prop_lambda, prop_past_beta):
"""!
@brief Compute
\beta_i \sim g_i(\beta_i | \lambda, \beta_1, ... \beta_{i-1})
"""
pass
def prob_ratio(self, *args, **kwargs):
return np.exp(self.ln_prob_ratio(*args, **kwargs))
def ln_prob_ratio(self, *args, **kwargs):
pass
def _ln_likelihood_ratio(self, beta_past, beta_proposed):
past_likelihood = self.ln_like_fn(beta_past)
proposed_likelihood = self.ln_like_fn(beta_proposed)
return proposed_likelihood - past_likelihood
def _ln_proposal_ratio(self, beta_proposal, beta_dr_list):
beta_dr_list= []
numerator = ()
pass
def _wrapped_mv_gausasin(self, beta_dr_list):
shrinkage_array = [1.0, 0.9, 0.8, 0.7, 0.6, 0.5]
mu = np.mean(beta_dr_list)
cov = self.cov_base * shrinkage_array[len(beta_dr_list)]
return stats.multivariate_normal(mean=mu, cov=cov)
def _sample_wrapped_mv_gaussian(self, n):
pass
def _ln_acceptance_ratio(self):
pass
class GaussianProposal(McmcProposal):
def __init__(self, mu=None, cov=None):
"""!
@brief Init
@param mu np_1darray. centroid of multi-dim gauss
@param cov np_ndarray. covariance matrix
"""
self._cov_init = False
self._mu = mu
self._cov = cov
super(GaussianProposal, self).__init__()
def update_proposal_cov(self, past_samples, rescale=0, verbose=0):
"""!
@brief fit gaussian proposal to past samples vector.
The approximate optimal gaussian proposal distribution is:
\f[
\mathcal N(x, (2.38^2)\Sigma_n / d
\f]
(ref: http://probability.ca/jeff/ftpdir/adaptex.pdf)
Where d is the dimension and \f$ \Sigma_n \f$ is the cov of the past samples
in the chain at stage n. Take x to be the current location of the
chain. In reality the proposal cov should be updated or
adapted such that an
acceptible acceptance ratio is reached... around 0.234
"""
self._cov = np.cov(past_samples.T)
# rescale cov matrix
if rescale > 0:
self._cov /= np.max(np.abs(self._cov)) / rescale
if not self._cov.shape:
self._cov = np.reshape(self._cov, (1, 1))
# prevent possiblity of sigular cov matrix
if not self._cov_init:
self._cov += np.eye(self._cov.shape[0]) * 1e-8
self._cov_init = True
if verbose:
print("New proposal cov = %s" % str(self._cov))
def sample_proposal(self, n_samples=1):
"""!
@brief Sample_proposal distribution
"""
assert self.mu is not None
assert self.cov is not None
return np.random.multivariate_normal(self.mu, self.cov, size=n_samples)[0]
def prob_ratio(self, ln_like_fn, theta_past, theta_proposed):
"""!
@brief evaluate probability ratio:
\f[
\frac{\Pi(x^i)}{\Pi(x^{i-1})} \frac{g(x^{i-1}|x^i)}{g(x^i| x^{i-1})}
\f]
Where \f$ g() \f$ is the proposal distribution fn
and \f[ \Pi \f] is the likelyhood function
"""
return np.exp(self.ln_prob_ratio(ln_like_fn, theta_past, theta_proposed))
def ln_prob_ratio(self, ln_like_fn, theta_past, theta_proposed):
"""!
@brief evaluate log of probability ratio
\f[
ln(\frac{\Pi(x^i)}{\Pi(x^{i-1})} \frac{g(x^{i-1}|x^i)}{g(x^i| x^{i-1})})
\f]
Where \f$ g() \f$ is the proposal distribution fn
and \f[ \Pi \f] is the likelyhood function.
"""
assert self.mu is not None
assert self.cov is not None
g_ratio = lambda x_0, x_1: \
stats.multivariate_normal.pdf(x_0, mean=theta_past, cov=self.cov) - \
stats.multivariate_normal.pdf(x_1, mean=theta_proposed, cov=self.cov)
g_r = g_ratio(theta_proposed, theta_past) # should be 1 in symmetric case
assert g_r == 0
past_likelihood = ln_like_fn(theta_past)
proposed_likelihood = ln_like_fn(theta_proposed)
return proposed_likelihood - past_likelihood + g_r
@property
def mu(self):
return self._mu
@mu.setter
def mu(self, mu):
self._mu = mu
@property
def cov(self):
return self._cov
@cov.setter
def cov(self, cov):
self._cov = cov
class McmcSampler(object):
"""!
@brief Markov Chain Monte Carlo (MCMC) base class.
Contains common methods for dealing with the likelyhood function
and for obtaining parameter estimates from the mcmc chain.
"""
def __init__(self, log_like_fn, proposal, **proposal_kwargs):
"""!
@brief setup mcmc sampler
@param log_like_fn callable. must return log likelyhood (float)
@param proposal string.
@param proposal_kwargs dict.
"""
self.log_like_fn = log_like_fn
if proposal == 'Gauss':
self.mcmc_proposal = GaussianProposal(proposal_kwargs)
else:
raise RuntimeError("ERROR: proposal type: %s not supported." % str(proposal))
self.chain = None
self.n_accepted = 1
self.n_rejected = 0
self.chain = None
def _freeze_ln_like_fn(self, **kwargs):
"""!
@brief Freezes the likelyhood function.
the log_like_fn should have signature:
self.log_like_fn(theta, data=np.array([...]), **kwargs)
and must return the log likelyhood
"""
self._frozen_ln_like_fn = lambda theta: self.log_like_fn(theta, **kwargs)
def param_est(self, n_burn):
"""!
@brief Computes mean an std of sample chain discarding the first n_burn samples.
@return mean (np_1darray), std (np_1darray), chain (np_ndarray)
"""
chain_slice = self.chain[n_burn:, :]
mean_theta = np.mean(chain_slice, axis=0)
std_theta = np.std(chain_slice, axis=0)
return mean_theta, std_theta, chain_slice
def run_mcmc(self, n, theta_0, **kwargs):
"""!
@brief Run the mcmc chain
@param n int. number of samples in chain to draw
@param theta_0 np_1darray of initial guess
"""
self._mcmc_run(n, theta_0, **kwargs)
def _mcmc_run(self, *args, **kwarags):
"""! @brief Run the mcmc_chain. Must be overridden."""
raise NotImplementedError
@property
def acceptance_fraction(self):
"""!
@brief Ratio of accepted samples vs total samples
"""
return self.n_accepted / (self.n_accepted + self.n_rejected)
@property
def current_pos(self):
return self.chain[-1, :]
def mh_kernel(i, mcmc_sampler, theta_chain, verbose=0):
"""!
@brief Metropolis-Hastings mcmc kernel.
The kernel, \f[ K \f], maps the pevious chain state, \f[ x \f],
to the new chain state, \f[ x' \f]. In matrix form:
\f[
K x = x'
\f]
In order for repeated application of an MCMC kernel to converge
to the correct posterior distribution \f[\Pi()\f],
it is sufficient but not neccissary to obey detailed balance:
\f[
K(x^i|x^{i-1})\Pi(x^{i}) = K(x^{i-1}|x^{i})\Pi(x^{i-1})
\f]
or
\f[
x_i K_{ij} = x_j K_{ji}
\f]
This means that a step in the chain is reversible.
The goal in MCMC is to find \f[ K \f] that makes the state vector, \f[ x \f]
become stationary at the desired distribution \f[ \Pi() \f]
@param i int. Current chain index
@param mcmc_sampler McmcSampler instance
@param theta_chain np_ndarray for sample storage
@param verbose int or bool. default == 0 (optional)
"""
theta = theta_chain[i, :]
# set the gaussian proposal to be centered at current loc
mcmc_sampler.mcmc_proposal.mu = theta
# gen random test value
a_test = np.random.uniform(0, 1, size=1)
# propose a new place to go
theta_prop = mcmc_sampler.mcmc_proposal.sample_proposal()
# compute acceptance ratio
a_ratio = np.min((1, mcmc_sampler.mcmc_proposal.prob_ratio(
mcmc_sampler._frozen_ln_like_fn,
theta,
theta_prop)))
if a_ratio >= 1.:
# accept proposal, it is in area of higher prob density
theta_chain[i+1, :] = theta_prop
mcmc_sampler.n_accepted += 1
if verbose: print("Aratio: %f, Atest: %f , Accepted bc Aratio > 1" % (a_ratio, a_test))
elif a_test < a_ratio:
# accept proposal, even though it is "worse"
theta_chain[i+1, :] = theta_prop
mcmc_sampler.n_accepted += 1
if verbose: print("Aratio: %f, Atest: %f , Accepted by chance" % (a_ratio, a_test))
else:
# stay put, reject proposal
theta_chain[i+1, :] = theta
mcmc_sampler.n_rejected += 1
if verbose: print("Aratio: %f, Atest: %f , Rejected!" % (a_ratio, a_test))
return theta_chain
class Metropolis(McmcSampler):
"""!
@brief Metropolis Markov Chain Monte Carlo (MCMC) sampler.
Proposal distribution is gaussian and symetric
"""
def __init__(self, log_like_fn, **proposal_kwargs):
proposal = 'Gauss'
super(Metropolis, self).__init__(log_like_fn, proposal, **proposal_kwargs)
def _mcmc_run(self, n, theta_0, cov_est=5.0, **kwargs):
"""!
@brief Run the metropolis algorithm.
@param n int. number of samples to draw.
@param theta_0 np_1darray. initial guess for parameters.
@param cov_est float or np_1darray. Initial guess of anticipated theta variance.
strongly recommended to specify, but is optional.
"""
verbose = kwargs.get("verbose", 0)
self._freeze_ln_like_fn(**kwargs)
# pre alloc storage for solution
self.n_accepted = 1
self.n_rejected = 0
theta_chain = np.zeros((n, np.size(theta_0)))
self.chain = theta_chain
theta_chain[0, :] = theta_0
self.mcmc_proposal.cov = np.eye(len(theta_0)) * cov_est
for i in range(n - 1):
# M-H Kernel
mh_kernel(i, self, theta_chain, verbose=verbose)
self.chain = theta_chain
class AdaptiveMetropolis(McmcSampler):
"""!
@brief Metropolis Markov Chain Monte Carlo (MCMC) sampler.
"""
def __init__(self, log_like_fn, **proposal_kwargs):
proposal = 'Gauss'
super(AdaptiveMetropolis, self).__init__(log_like_fn, proposal, **proposal_kwargs)
def _mcmc_run(self, n, theta_0, cov_est=5.0, **kwargs):
"""!
@brief Run the metropolis algorithm.
@param n int. number of samples to draw.
@param theta_0 np_1darray. initial guess for parameters.
@param cov_est float or np_1darray. Initial guess of anticipated theta variance.
strongly recommended to specify, but is optional.
@param adapt int. Sample index at which to begin adaptively updating the
proposal distribution (default == 200)
@param lag int. Number of previous samples to use for proposal update (default == 100)
@param lag_mod. Number of iterations to wait between updates (default == 1)
"""
verbose = kwargs.get("verbose", 0)
adapt = kwargs.get("adapt", 1000)
lag = kwargs.get("lag", 1000)
lag_mod = kwargs.get("lag_mod", 100)
self._freeze_ln_like_fn(**kwargs)
# pre alloc storage for solution
self.n_accepted = 1
self.n_rejected = 0
theta_chain = np.zeros((n, np.size(theta_0)))
self.chain = theta_chain
theta_chain[0, :] = theta_0
self.mcmc_proposal.cov = np.eye(len(theta_0)) * cov_est
for i in range(n - 1):
# M-H Kernel
mh_kernel(i, self, theta_chain, verbose=verbose)
# continuously update the proposal distribution
# if (lag > adapt):
# raise RuntimeError("lag must be smaller than adaptation start index")
if i >= adapt and (i % lag_mod) == 0:
if verbose: print(" Updating proposal cov at sample index = %d" % i)
current_chain = theta_chain[:i, :]
self.mcmc_proposal.update_proposal_cov(current_chain[-lag:, :], verbose=verbose)
self.chain = theta_chain
class McmcChain(object):
"""!
@brief A simple markov chain with some helper functions.
"""
def __init__(self, theta_0, varepsilon=1e-6, mpi_comm=None, mpi_rank=None):
self.mpi_comm, self.mpi_rank = mpi_comm, mpi_rank
self._dim = len(theta_0)
theta_0 = np.asarray(theta_0) + self.var_ball(varepsilon, self._dim)
# init the 2d array of shape (iteration, <theta_vec>)
self._chain = np.array([theta_0])
@staticmethod
def var_ball(varepsilon, dim):
# tight gaussian ball
var_epsilon = 0.
if varepsilon > 0:
var_epsilon = np.random.multivariate_normal(np.zeros(dim),
np.eye(dim) * varepsilon,
size=1)[0]
return var_epsilon
def set_t_kernel(self, t_kernel):
"""!
@brief Set valid transition kernel
"""
assert t_kernel.shape[1] == self._chain.shape[1]
assert t_kernel.shape[1] == t_kernel.shape[0] # must be square
self.t_kernel = t_kernel
def t_kernel_eig(self):
"""!
@brief Get eigenvals of transition kernel
"""
return np.linalg.eig(self.t_kernel)
def apply_t_kernel(self, apply_new_state=True):
new_state = np.dot(self.t_kernel, self._chain[:-1])
if apply_new_state:
self.append_sample(new_state)
return new_state
def append_sample(self, theta_new):
theta_new = np.asarray(theta_new)
assert theta_new.shape[0] == self.chain.shape[1]
self._chain = np.vstack((self._chain, theta_new))
def pop_sample(self):
self._chain = self._chain[:-1, :]
def auto_corr(self, lag):
"""!
@brief Compute auto correlation in the chain.
"""
pass
@property
def chain(self):
return self._chain
@property
def current_pos(self):
return self.chain[-1, :]
@property
def dim(self):
return self._dim
class DeMc(McmcSampler):
"""!
@breif Differential-evolution metropolis sampler (DE-MC) note: not DREAM.
Utilized multiple chains to crawl the parameter space more efficiently.
At each MCMC iteration chains can be updated using a simple mutation operator:
\f[
\theta^* = \theta_i + \gamma(\theta_a + theta_b) + \varepsilon
\f]
"""
def __init__(self, log_like_fn, n_chains=8, **proposal_kwargs):
assert n_chains >= 4
self.n_chains = n_chains
proposal = 'Gauss'
self.accepted_proposals = 0.
self.total_proposals = 0.
super(DeMc, self).__init__(log_like_fn, proposal, **proposal_kwargs)
def _mcmc_run(self, n, theta_0, varepsilon=1e-6, **kwargs):
# params for DE-MC algo
self.accepted_proposals = 0.
self.total_proposals = 0.
dim = len(theta_0)
gamma = kwargs.get("gamma", 2.38 / np.sqrt(2. * dim))
# initilize chains
self.am_chains = []
for i in range(self.n_chains):
self.am_chains.append(McmcChain(theta_0, varepsilon * kwargs.get("inflate", 1e1)))
# DE-MC algo
# for j in range(int(n / self.n_chains)):
j = 0
while j <= n:
for i, am_chain in enumerate(self.am_chains):
current_chain = self.am_chains[i]
# randomly select chain pair from chain pool
valid_pool_ids = np.delete(np.array(range(self.n_chains)), i)
mut_chain_ids = np.random.choice(valid_pool_ids, replace=False, size=2)
# get location of each mutation chain
mut_a_chain = self.am_chains[mut_chain_ids[0]]
mut_b_chain = self.am_chains[mut_chain_ids[1]]
# generate proposal vector
prop_vector = gamma * (mut_a_chain.current_pos - mut_b_chain.current_pos)
prop_vector += current_chain.current_pos
prop_vector += McmcChain.var_ball(varepsilon * 1e-3, dim)
# note the probability ratio can be computed in parallel
# accept or reject mutated chain loc
alpha = self._mut_prop_ratio(self.log_like_fn,
current_chain.current_pos,
prop_vector)
accept_bool = self.metropolis_accept(alpha)
if accept_bool:
current_chain.append_sample(prop_vector)
self.accepted_proposals += 1
else:
current_chain.append_sample(current_chain.current_pos)
self.total_proposals += 1
j += 1
@property
def acceptance_fraction(self):
return self.accepted_proposals / self.total_proposals
def param_est(self, n_burn):
chain_slice = self.super_chain[n_burn:, :]
mean_theta = np.mean(chain_slice, axis=0)
std_theta = np.std(chain_slice, axis=0)
return mean_theta, std_theta, chain_slice
@property
def super_chain(self):
return self._super_chain()
def _super_chain(self):
super_ch = np.zeros((self.n_chains * self.am_chains[0].chain.shape[0],
self.am_chains[0].chain.shape[1]))
for i, chain in enumerate(self.am_chains):
super_ch[i::len(self.am_chains), :] = chain.chain
return super_ch
def _mut_prop_ratio(self, log_like_fn, current_theta, mut_theta):
# metropolis ratio
alpha = np.min((1.0, np.exp(log_like_fn(mut_theta) - log_like_fn(current_theta))))
alpha = np.clip(alpha, 0.0, 1.0)
return alpha
@staticmethod
def metropolis_accept(alpha):
return np.random.choice([True, False], p=[alpha, 1. - alpha])
def fit_line():
"""!
@brief Example data from http://dfm.io/emcee/current/user/line/
For example/testing only.
"""
# Choose the "true" parameters.
m_true = -0.9594
b_true = 4.294
f_true = 0.534
# Generate some synthetic data from the model.
N = 50
x = np.sort(10 * np.random.rand(N))
yerr = 0.1 + 0.5 * np.random.rand(N)
y = m_true * x + b_true
y += np.abs(f_true * y) * np.random.randn(N)
y += yerr * np.random.randn(N)
def log_prior(theta):
if (-50 < theta[0] < 50) and (-50 < theta[1] < 50):
return 0.
else:
return -np.inf
def model_fn(theta):
return theta[0] + theta[1] * x
def log_like_fn(theta, data=y):
sigma = 1.0
log_like = -0.5 * (np.sum((data - model_fn(theta)) ** 2 / sigma \
- np.log(1./sigma)) + log_prior(theta))
return log_like
# init sampler
theta_0 = np.array([4.0, -0.5])
# my_mcmc = Metropolis(log_like_fn)
# my_mcmc = AdaptiveMetropolis(log_like_fn)
my_mcmc = DeMc(log_like_fn)
my_mcmc.run_mcmc(10000, theta_0, data=y, cov_est=np.array([[0.2, -0.3], [-0.3, 0.01]]))
# view results
theta_est, sig_est, chain = my_mcmc.param_est(6000)
print("Esimated params: %s" % str(theta_est))
print("Estimated params sigma: %s " % str(sig_est))
print("Acceptance fraction: %f" % my_mcmc.acceptance_fraction)
# vis the parameter estimates
mc_plot.plot_mcmc_params(chain,
labels=["$y_0$", "m"],
savefig='line_mcmc_ex.png',
truths=[4.294, -0.9594])
# vis the full chain
theta_est_, sig_est_, full_chain = my_mcmc.param_est(0)
mc_plot.plot_mcmc_chain(full_chain,
labels=["$y_0$", "m"],
savefig='lin_chain_ex.png',
truths=[4.294, -0.9594])
def sample_gauss():
"""! @brief Sample from a gaussian distribution """
mu_gold, std_dev_gold = 5.0, 0.5
def log_like_fn(theta, data=None):
return np.log(stats.norm.pdf(theta[0],
loc=mu_gold,
scale=std_dev_gold)) - log_prior(theta)
def log_prior(theta):
if -100 < theta[0] < 100:
return 0
else:
return -np.inf
# init sampler
theta_0 = np.array([1.0])
# my_mcmc = Metropolis(log_like_fn)
my_mcmc = DeMc(log_like_fn)
my_mcmc.run_mcmc(4000, theta_0, data=[0, 0, 0], cov_est=1.0)
# view results
theta_est, sig_est, chain = my_mcmc.param_est(200)
print("Esimated mu: %s" % str(theta_est))
print("Estimated sigma: %s " % str(sig_est))
print("Acceptance fraction: %f" % my_mcmc.acceptance_fraction)
# vis the parameter estimates
mc_plot.plot_mcmc_params(chain, ["$\mu$"], savefig='gauss_mu_mcmc_ex.png', truths=[5.0])
# vis the full chain
theta_est_, sig_est_, full_chain = my_mcmc.param_est(0)
mc_plot.plot_mcmc_chain(full_chain, ["$\mu$"], savefig='gauss_mu_chain_ex.png', truths=[5.0])
if __name__ == "__main__":
print("========== SAMPLE GAUSSI ===========")
sample_gauss()
print("========== FIT LIN MODEL ===========")
fit_line()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment