Skip to content

Instantly share code, notes, and snippets.

@apatil
Created February 24, 2011 18:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save apatil/842609 to your computer and use it in GitHub Desktop.
Save apatil/842609 to your computer and use it in GitHub Desktop.
Step methods that try to sample in 'isotropic position' using histories of their parameters
import pymc as pm
import numpy as np
# FIXME: Need to store duplicates too, when jumps are rejected. That means some mechanism
# for making sure the history is full-rank needs to be employed.
class HistoryCovarianceStepper(pm.StepMethod):
_state = ['n_points','history','tally','verbose']
def __init__(self, stochastic, n_points=None, init_history=None, verbose=None, tally=False):
if not np.iterable(stochastic) or isinstance(stochastic, pm.Variable):
stochastic = [stochastic]
pm.StepMethod.__init__(self, stochastic, verbose, tally)
# Initialization methods
self.check_type()
self.dimension()
self.n_points = n_points or max(self.dim, 20)
self.index = 0
if init_history is None:
self.history = np.empty((self.n_points, self.dim))
for i in xrange(self.n_points):
for s in self.stochastics:
self.history[i,self._slices[s]] = s._random(**s.parents.value)
else:
self.history = init_history
self.history_mean = np.mean(self.history, axis=0)
self._state = HistoryCovarianceStepper._state
def unstore(self):
index = (self.index-1)%self.n_points
self.history_mean += (self.last_value - self.history[index])/self.n_points
self.history[index] = self.last_value
def get_current_value(self):
current_value = np.empty(self.dim)
for s in self.stochastics:
current_value[self._slices[s]] = s.value
return current_value
def set_current_value(self, v):
for s in self.stochastics:
s.value = v[self._slices[s]]
def store(self, v):
self.last_value = self.history[self.index].copy()
self.history[self.index] = v
self.index = (self.index + 1) % self.n_points
self.history_mean += (v-self.last_value)/self.n_points
def check_type(self):
"""Make sure each stochastic has a correct type, and identify discrete stochastics."""
self.isdiscrete = {}
for stochastic in self.stochastics:
if stochastic.dtype in pm.StepMethods.integer_dtypes:
self.isdiscrete[stochastic] = True
elif stochastic.dtype in pm.StepMethods.bool_dtypes:
raise 'Binary stochastics not supported by AdaptativeMetropolis.'
else:
self.isdiscrete[stochastic] = False
def choose_direction(self, norm=True):
direction = np.dot(np.random.normal(size=self.n_points), self.history) - self.history_mean
if norm:
direction /= np.dot(direction, direction)
return direction
def dimension(self):
"""Compute the dimension of the sampling space and identify the slices
belonging to each stochastic.
"""
self.dim = 0
self._slices = {}
for stochastic in self.stochastics:
if isinstance(stochastic.value, np.matrix):
p_len = len(stochastic.value.A.ravel())
elif isinstance(stochastic.value, np.ndarray):
p_len = len(stochastic.value.ravel())
else:
p_len = 1
self._slices[stochastic] = slice(self.dim, self.dim + p_len)
self.dim += p_len
class HistoryAM(HistoryCovarianceStepper, pm.Metropolis):
_state = HistoryCovarianceStepper._state + ['accepted','rejected','adaptive_scale_factor','proposal_distribution','proposal_sd']
def __init__(self, stochastic, n_points=None, init_history=None, verbose=None, tally=False, proposal_sd=1):
HistoryCovarianceStepper.__init__(self, stochastic, n_points, init_history, verbose, tally)
self.proposal_distribution = "None"
self.adaptive_scale_factor = 1
self.accepted = 0
self.rejected = 0
self.proposal_sd=proposal_sd
self._state = HistoryAM._state
def reject(self):
for stochastic in self.stochastics:
stochastic.revert()
self.unstore()
def propose(self):
direction = self.choose_direction()
proposed_value = self.get_current_value() + direction * np.random.normal()*self.adaptive_scale_factor*self.proposal_sd
self.set_current_value(proposed_value)
self.store(proposed_value)
class HRAM(HistoryCovarianceStepper):
_state = HistoryCovarianceStepper._state + ['xprime_n', 'xprime_sds']
def __init__(self, stochastic, n_points=None, init_history=None, verbose=None, tally=False, xprime_sds=5, xprime_n=101):
HistoryCovarianceStepper.__init__(self, stochastic, n_points, init_history, verbose, tally)
self.xprime_sds = xprime_sds
self.xprime_n = xprime_n
self._state = HRAM._state
def step(self):
direction = self.choose_direction(norm=False)
current_value = self.get_current_value()
x_prime = np.vstack((current_value, np.outer(np.linspace(-self.xprime_sds,self.xprime_sds,self.xprime_n),direction) + current_value))
lps = np.empty(self.xprime_n+1)
lps[0] = self.logp_plus_loglike
for i in xrange(self.xprime_n):
self.set_current_value(x_prime[i+1])
lps[i+1] = self.logp_plus_loglike
next_value = x_prime[pm.rcategorical(np.exp(lps-pm.flib.logsum(lps)))]
self.set_current_value(next_value)
self.store(next_value)
if __name__ == '__main__':
import time
for kls in [HistoryAM, HRAM, pm.AdaptiveMetropolis]:
x = pm.Normal('x',0,1,size=1000)
y = pm.Normal('y',x,100,size=1000)
M = pm.MCMC([x,y])
# M.use_step_method(HistoryAM, [x,y])
if kls is pm.AdaptiveMetropolis:
M.use_step_method(kls, [x,y], delay=10, interval=10)
else:
M.use_step_method(kls, [x,y])
sm = M.step_method_dict[x][0]
if kls is HRAM:
sm.accepted = 0
sm.rejected = 0
t1 = time.time()
M.sample(100)
t2 = time.time()
print 'Class %s: %s seconds, accepted %i, rejected %i'%(kls.__name__, t2-t1, sm.accepted, sm.rejected)
import pylab as pl
pl.clf()
pl.plot(M.trace('x')[:,0], M.trace('y')[:,0],linestyle='none',marker='.',markersize=8,color=(.1,0,.8,),alpha=10./len(M.trace('x')[:]),markeredgewidth=0)
pl.title(kls.__name__)
from IPython.Debugger import Pdb
Pdb(color_scheme='Linux').set_trace()
# a = np.empty((1000,2))
# for i in xrange(1000):
# M.step_method_dict[x][0].step()
# a[i,0] = x.value
# a[i,1] = y.value
#
# import pylab as pl
# pl.plot(a[:,0], a[:,1], 'b.')
\documentclass[a4paper]{article}
\usepackage{fullpage}
\usepackage{epsfig}
\usepackage{pdfsync}
\usepackage{amsfonts}
\usepackage{amsmath}
\usepackage{amsthm}
\newtheorem{lemma}{Lemma}
\newtheorem*{definition}{Definition}
\newtheorem*{conjecture}{Conjecture}
\newtheorem*{theorem}{Theorem}
\begin{document}
\title{History-dependent MCMC}
\author{Anand}
\maketitle
You want to sample from target distribution $\pi$ on finite state space $\mathcal X$. You have a non-Markov kernel $K:\mathcal X^T\rightarrow(\mathcal X\rightarrow \mathbb R)$, so $K(x_{1\ldots T},\cdot)$ for length-$T$ history $x_{1\ldots T}$ is a probability distribution on $\mathcal X$. Define the $j$-step-ahead kernel $K^j$ by
\begin{eqnarray*}
K^j(x_{1\ldots T},x_{T+j})=\sum_{x_T+1}\ldots\sum_{x_{T+j-1}} \prod_{i=1}^j K(x_{i\ldots T+i-1}, x_{T+i}).
\end{eqnarray*}
\begin{definition}\label{def:marginally stationary}
$K$ is \emph{marginally stationary} with respect to $\pi$ if, for any distribution $\mu$ on $\mathcal X^T$ with marginals $\pi$,
\begin{eqnarray*}
E_{\mu(x_{1\ldots T})}\left[K(x_{1\ldots T}, x_{T+1})\right]=\pi(x_{t+1}).
\end{eqnarray*}
\end{definition}
That is, if $x_{1\ldots T}$ are distributed such that their marginals are all $\pi$, then after application of $x$ $x_{T+1}$ is distributed as $\pi$ as well. Note that for $T=1$, $K$ is a Markov kernel whose stationary distribution is $\pi$.
\begin{lemma}\label{lem:one}
If $K$ is marginally stationary with respect to $\pi$, and the distribution $\mu$ of $x_{1\ldots T}$ has marginals $\pi$, then the distribution $\mu^1$ of $x_{2\ldots T+1}$ after one iteration of $K$ has all of its marginals equal to $\pi$.
\end{lemma}
\begin{proof}
$\mu^1$ is
\begin{eqnarray*}
\mu^1(x_{2\ldots T+1})=\sum_{x_1}\mu(x_{1\ldots T})K(x_{1\ldots T},x_{T+1}).
\end{eqnarray*}
By marginally stationarity of $K$, the marginal of $x_{T+1}$ is $\pi$. For $i\in [2,T]$, the marginal can be computed as
\begin{eqnarray*}
\sum_{x_1}\ldots\sum_{x_{i-1}}\sum_{x_{i+1}}\ldots \sum_{x_T}\mu(x_{1\ldots T})\sum_{x_{T+1}}K(x_{1\ldots T},x_{T+1})
\end{eqnarray*}
which is simply equal to the $i$'th marginal of $\mu$, which is $\pi$ by hypothesis.
\end{proof}
\begin{lemma}\label{lem:two}
If $K$ is marginally stationary with respect to $\pi$, then the distribution $\mu^j$ of $x_{j+1\ldots T+j}$ after $j>1$ iterations of $K$ has all of its marginals equal to $\pi$.
\end{lemma}
\begin{proof}
$\mu^j$ is
\begin{eqnarray*}
\sum_{x_1}\ldots \sum_{x_j} \mu(x_{1\ldots T}) \prod_{i=1}^j K(x_{i\ldots T+i-1}, x_{T+i})\\
= \sum_{x_j} \left[\sum_{x_1}\ldots \sum_{x_{j-1}} \mu(x_{1\ldots T}) \prod_{i=1}^{j-1} K(x_{i\ldots T+i-1}, x_{T+i}) \right] K(x_{j\ldots T+j-1}, x_{T+j})\\
=\sum_{x_j} \mu^{j-1}(x_{j\ldots T+j-1}) K(x_{j\ldots T+j-1}, x_{T+j})
\end{eqnarray*}
A similar argument to lemma \ref{lem:one} shows that, if $\mu^{j-1}$'s marginals are all $\pi$, then $\mu^j$'s marginals are also all $\pi$. Since lemma \ref{lem:one} says that $\mu^1$'s marginals are all $\pi$, the claim follows by induction.
\end{proof}
\begin{theorem}\label{thm:marginals-preserved}
If $K$ is marginally stationary with respect to $\pi$, then $\pi$ is the stationary distribution for $K$ in the sense that, if the distribution $\mu$ of $x_{1\ldots T}$ has marginals $\pi$, then the marginal distribution $K^j\mu$ of $x_{T+j}$ is equal to $\pi$.
\end{theorem}
\begin{proof}\label{pf:marginals-preserved}
This is a special case of lemma \ref{lem:two}: after $K^j$ is applied to $\mu$, the marginals of the distribution of $x_{j+1\ldots T+j}$ are all equal to $\pi$.
\end{proof}
That means that many iterations of $K$ produce a usable MCMC trace for $x$ if the distribution of the initial history has the correct marginals. Note that all the theorems can be converted to `if and only if $K$ is marginally stationary' by considering all $\mu$ with marginals $\pi$, rather than particular distributions. % That makes me think it should be possible to prove the following, which says that if $K$ is marginally stationary with respect to $\pi$ then it is a valid jumping strategy.
% \begin{conjecture}
% If $K$ is marginally stationary with respect to $\pi$, then $K^j\eta$ converges to $\pi$ in some sense as $j\rightarrow \infty$ from any starting distribution $\eta$.
% \end{conjecture}
% \section*{Another class}
% Suppose $K$ is of the form $K(x_{1\ldots T},x_{T+1})=Q(x_{T},x_{T+1};\theta(x_{1\ldots T-1}))$, and that $Q$ is a reversible Markov kernel with respect to $\pi$ regardless of $\theta$. Wikipedia says that, for any probability distribution $\eta$ and tuning parameter $\theta$, Kullback-Leibler distance to $\pi$ decreases:
% \begin{eqnarray*}
% D\left(\eta(x_T)||\pi(x_T)\right)\ge D\left(\sum_{x_T\in\mathcal X}\eta(x_T)Q(x_T,x_{T+1};\theta)||\pi(x_T)\right)
% \end{eqnarray*}
% with equality only if $\eta=\pi$.
%
% % FIXME: This may not be true.
% \begin{lemma}\label{lem:Blah}
% For any starting distribution $\mu$,
% \begin{eqnarray*}
% D(\mu_T(x_T)||\pi(x_T))\ge D\left(\sum_{x_T\in\mathcal X}\mu_T(x_T)\sum_{x_1\in\mathcal X}\sum_{x_{T-1}\in\mathcal X}Q(x_T,x_{T+1};\theta)||\pi(x_T)\right)
% \end{eqnarray*}
% where $\mu_T$ denotes the $T$'th marginal of $\mu$. That is, if $K$ is used to generate an MCMC trace, the Kullback-Leibler divergence between the marginal of the head of the trace and $\pi$ does not increase. \textbf{NOTE:} this may not be true.
% \end{lemma}
% \begin{proof}\label{pf:kl-decreases}
% Expanding the left-hand term yields
% \begin{eqnarray*}
% D(\mu_T(x_T)||\pi(x_T))=\sum_{x_1\in\mathcal X}\ldots \sum_{x_T\in\mathcal X} \mu(x_{1\ldots T})\log \frac{\sum_{x_1\in\mathcal X}\ldots\sum_{x_{T-1}\in\mathcal X} \mu(x_{1\ldots T})}{\pi(x_T)}
% \end{eqnarray*}
% \end{proof}
\end{document}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment