Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
MCMC exercises
"""
Exercises for the Markov Chain Monte-Carlo (MCMC) course available at
http://users.aims.ac.za/~ioana/
"""
import numpy as np
import numpy.linalg as la
import pylab
from scipy import stats
from scipy import linalg
"""
1. Markov Chains.
"""
K = np.array([[0.9, 0.1], [0.3, 0.7]])
lambda_ = np.array([0.2, 0.8])
class MarkovChain(object):
def __init__(self, K, lambda_):
self.K = K # transition matrix/kernel
self.lambda_ = lambda_ # initial state distribution
def m_step_kernel(self, m):
return np.array((np.matrix(self.K) ** m))
def X_t(self, t):
return np.dot(lambda_, self.m_step_kernel(t))
def invariant_distribution(self):
eigenvalues, eigenvectors = la.eig(self.K.T)
idx = eigenvalues.argsort()[::-1][0]
mu = eigenvectors[:,idx]
return mu/mu.sum()
def sample(self, n):
s = [np.random.multinomial(1, self.lambda_).argmax()]
for i in range(n-1):
s.append(np.random.multinomial(1, self.K[s[-1],:]).argmax())
return s
mc = MarkovChain(K, lambda_)
def ex1_1():
"""
Compute the m-step kernel of K.
"""
print mc.m_step_kernel(2)
def ex1_2():
"""
Compute distribution of X_t.
"""
print mc.X_t(2)
def ex1_3():
"""
Compute invariant distribution.
"""
print mc.invariant_distribution()
def ex1_4():
"""
Plot the value of X_t over time to show that it converges
to the invariant distribution.
"""
distn = np.array([mc.X_t(t) for t in xrange(20)])
pylab.figure()
pylab.plot(distn[:,0], 'bs-', distn[:,1], 'rs-')
pylab.show()
def ex1_5():
"""
Plot a chain sample.
"""
N = 100
s = mc.sample(N)
pylab.figure()
pylab.plot(s, 'gs-')
pylab.show()
def ex1_6():
"""
Verify proportion of 0 and 1 after sampling is similar to
invariant distribution.
"""
N = 10000
s = mc.sample(N)
hist, bins = np.histogram(s, bins=2)
print hist/(N*1.0)
"""
2. Inverse cdf method and Rejection sampling.
"""
def sample_expo(n, lambda_):
"""
The exponential distribution has cdf:
F(x) = 1 - exp(lambda_ * x)
To sample from the exponential distribution:
* generate a sample u from the uniform distribution
* F^-1(u) = -log(1-u)/lambda_
"""
u = np.random.uniform(size=n)
return -np.log(1-u)/lambda_
def ex2_2():
"""
Plot the histogram for the data samples with sample_expo
and superimpose the density function of the exponential distribution
to check that they have the same shape.
"""
lambda_ = 2.0
n = 100
pylab.figure()
pylab.hist(sample_expo(n, lambda_), normed=True)
x = np.linspace(0,5,n)
pylab.plot(x, lambda_*np.exp(-lambda_*x))
pylab.show()
def sample_gamma_integer(k, lambda_):
"""
To sample from Gamma(k, lambda_), where k is an integer,
we can take advantage of the fact that
Y = X_1 + ... + X_k ~ Gamma(k, lambda_)
when X_1, ..., X_k are i.i.d and X_k ~ Expo(lambda_)
"""
return np.sum(sample_expo(k, lambda_))
def gamma_pdf(x, alpha, lambda_):
return stats.gamma.pdf(x*lambda_, alpha)*lambda_
def ex2_4():
"""
Plot the proposal distribution in red and the distribution
to sample from in blue to show that the blue curve is
always under the red curve.
"""
alpha = 5.7
k = np.floor(alpha)
lambda_ = 2.0
# multiplicative factore to ensure that the proposal distribution
# is always above
M = gamma_pdf(alpha-k,alpha,lambda_)/gamma_pdf(alpha-k,k,lambda_-1)
x = np.linspace(0,10,100)
pylab.figure()
pylab.plot(x, gamma_pdf(x,alpha,lambda_),'b-')
pylab.plot(x, M*gamma_pdf(x,k,lambda_-1),'r:')
pylab.show()
def sample_gamma(alpha, lambda_):
"""
Sample from the Gamma(alpha,lambda) using rejection sampling.
sample_gamma_integer is used as the proposal distribution.
"""
k = np.floor(alpha)
M = gamma_pdf(alpha-k,alpha,lambda_)/gamma_pdf(alpha-k,k,lambda_-1)
while True:
x = sample_gamma_integer(k, lambda_-1)
prob_accept = gamma_pdf(x, alpha, lambda_) / \
(M * gamma_pdf(x, k, lambda_-1))
u = np.random.random()
if u <= prob_accept:
return x
def ex2_6():
"""
Plot the theoritical density and the histogram obtained from sampling
to show that they have the same shape.
"""
alpha = 5.7
lambda_ = 2.0
x = [sample_gamma(alpha,lambda_) for i in xrange(1000)]
pylab.figure()
pylab.hist(x, normed=True)
x = np.linspace(0,10,100)
pylab.plot(x, gamma_pdf(x, alpha,lambda_))
pylab.show()
"""
3. Importance sampling.
"""
# only the first 10 are considered labeled
y = np.array([3, 6, 3, 5, 9, 14, 12, 11, 19, 18,
15, 4, 1, 6, 11, 21, 11, 3, 7, 18])
alpha = 0.1
beta = 0.1
# use rejection sampling above but too slow
#def sample_gamma_n(n, alpha, lambda_):
# return np.array([sample_gamma(alpha,lambda_) for i in xrange(n)])
def sample_gamma_n(n, alpha, beta):
return stats.gamma(alpha).rvs(n)/beta
def sample_lambda(n, alpha_post_1, alpha_post_2, beta_post):
# sample from f(lambda1|y1,...,y5)
lambda_1 = sample_gamma_n(n, alpha_post_1, beta_post)
# sample from f(lambda2|y6,...,y10)
lambda_2 = sample_gamma_n(n, alpha_post_2, beta_post)
weights = np.zeros(n)
# compute the weight for each sample
for i in xrange(n):
weights[i] = np.prod(0.5 * stats.poisson(lambda_1[i]).pmf(y[10:20]) + 0.5 * stats.poisson(lambda_2[i]).pmf(y[10:20]))
return weights, lambda_1, lambda_2
def ex3_4():
"""
Find the parameters lambda1 and lambda2 of two groups
having a Poisson distribution.
The proposal distribution used is the distribution fitted on labeled
data.
"""
# by definition the posterior distribution is a gamma distribution
# with the following alpha and beta parameters
alpha_post_1 = alpha + np.sum(y[0:5])
alpha_post_2 = alpha + np.sum(y[5:10])
beta_post = beta + 5
# the expectation is alpha/beta (shape/scale)
post_mean_1_labeled = alpha_post_1 / beta_post
post_mean_2_labeled = alpha_post_2 / beta_post
weights, lambda_1, lambda_2 = sample_lambda(1000, alpha_post_1, alpha_post_2, beta_post)
# compute the expectation (weighted sum)
post_mean_1_all = np.sum(lambda_1 * weights) / np.sum(weights)
post_mean_2_all = np.sum(lambda_2 * weights) / np.sum(weights)
print "Labeled", post_mean_1_labeled, post_mean_2_labeled
print "All", post_mean_1_all, post_mean_2_all
def kde(data, newpoints, weights, h=1.0):
"""
Kernel Density Estimation
"""
weights = weights / np.sum(weights) * len(weights)
def K(x, xi):
return 1/np.sqrt(2*np.pi) * np.exp(-(x-xi)**2/(2*h**2))
def f(x, xi):
n = len(xi)
return 1/(n*h) * np.sum(weights * K(x,xi))
return np.array([f(x, data) for x in newpoints])
def ex3_5():
"""
Plot the density with and without unlabeled data.
Kernel Density Estimation is used for the former in order to extrapolate to
new values.
"""
alpha_post_1 = alpha + np.sum(y[0:5])
alpha_post_2 = alpha + np.sum(y[5:10])
beta_post = beta + 5
weights, lambda_1, lambda_2 = sample_lambda(1000, alpha_post_1, alpha_post_2, beta_post)
t = np.linspace(0,20,1000)
lambda_1_density = kde(lambda_1, t, weights)
pylab.figure(1)
pylab.plot(t, lambda_1_density,'r-')
pylab.plot(t, gamma_pdf(t, alpha_post_1, beta_post), 'g:')
pylab.show()
lambda_2_density = kde(lambda_2, t, weights)
pylab.figure(2)
pylab.plot(t, lambda_2_density,'r-')
pylab.plot(t, gamma_pdf(t, alpha_post_2, beta_post), 'g:')
pylab.show()
def ex3_6():
"""
Find the probability to be in group1 for the unlabeled examples (11 to 20).
"""
alpha_post_1 = alpha + np.sum(y[0:5])
alpha_post_2 = alpha + np.sum(y[5:10])
beta_post = beta + 5
n = 1000
weights, lambda_1, lambda_2 = sample_lambda(n, alpha_post_1, alpha_post_2, beta_post)
# for each of the n samples of lambda1 and lambda2
# we want to know the probability of belonging to group 1 or 2
allocations = np.zeros((n,10))
for i in xrange(n):
allocations[i,:] = stats.poisson(lambda_1[i]).pmf(y[10:20]) / (stats.poisson(lambda_1[i]).pmf(y[10:20]) + stats.poisson(lambda_2[i]).pmf(y[10:20]))
# then we want to take the expectation over the n samples
# which we do by taking the weighted average
# using the weights obtained by importance sampling
# note: the weights are associated with the n samples of lambda
# not with the 10 unlabeled data!
prob_of_group_1 = np.zeros(10)
for i in range(10):
prob_of_group_1[i] = sum(allocations[:,i]*weights) / sum(weights)
print prob_of_group_1
"""
4. Gibbs sampling.
"""
def sample_conditional(x2, mu1, mu2, sigma1_2, sigma2_2, sigma12):
mu = mu1 + (sigma12/sigma2_2) * (x2-mu2)
sigma_2 = sigma1_2 - (sigma12**2)/sigma2_2
return np.random.normal(mu, np.sqrt(sigma_2)) # return x1
def gibbs_sampler(n, mu, cov):
"""
Perform Gibbs sampling for a bivariate normal distribution.
mu = [mu1, mu2]
cov [[sigma1**2,sigma12],[sigma21, sigma2**2]]
"""
x1 = np.zeros(n)
x2 = np.zeros(n)
# initialization
x1[0] = np.random.normal(mu[0], np.sqrt(cov[0,0]))
x2[0] = np.random.normal(mu[1], np.sqrt(cov[1,1]))
for t in range(1,n):
x1[t] = sample_conditional(x2[t-1], mu[0], mu[1], cov[0,0], cov[1,1],
cov[0,1])
x2[t] = sample_conditional(x1[t], mu[1], mu[0], cov[1,1], cov[0,0],
cov[1,0])
return x1, x2
def ex4_2():
"""
Plot (x1,x2)
(t, x1)
(t, x2)
"""
mu = np.array([0,0])
cov = np.array([[4,1],[1,4]])
cov = np.array([[4,2.8],[2.8,4]])
n = 3000
x1, x2 = gibbs_sampler(n, mu, cov)
pylab.figure(0)
pylab.plot(x1[2900:],x2[2900:])
pylab.figure(1)
pylab.plot(x1,'b')
pylab.figure(2)
pylab.plot(x2,'r')
pylab.show()
data = np.array([x1,x2])
# sample mean and covariance matrix
print np.mean(data, axis=1)
print np.cov(data, rowvar=1)
def ex4_3():
"""
Plot P(x1 >=0 and x2 >= 0) to show that it converges as t increases.
"""
mu = np.array([0,0])
cov = np.array([[4,1],[1,4]])
cov = np.array([[4,2.8],[2.8,4]])
n = 3000
x1, x2 = gibbs_sampler(n, mu, cov)
prob = np.zeros(n)
count = int(x1[0] >= 0 and x2[0] >= 0)
prob[0] = count
for t in range(1,n):
count += int(x1[t] >= 0 and x2[t] >= 0)
prob[t] = count * 1.0 / t
pylab.figure(3)
pylab.plot(prob)
pylab.axis([-200,n+200,None,None])
pylab.show()
"""
4. Metropolis-Hasting.
"""
def normal_pdf(x, mu, sigma):
inv_sigma = linalg.inv(sigma)
x_minus_mu = x-mu
return np.exp(-0.5*np.dot(np.dot(x_minus_mu.T,inv_sigma),x_minus_mu))/ \
(2*np.pi*np.sqrt(linalg.det(sigma)))
def mh_sampling(n, mu, sigma, mu_prop=0, sigma_prop=2.5):
"""
Metropolis-Hasting sampling with a symmetric proposal distribution (aka
Metropolis sampling) for the bivariate gaussian.
"""
x = np.zeros((n,2))
x[0,:] = np.array([0,0]) # arbitrary initial values
accepted_n = 0
for t in xrange(1,n):
# can sample 2 iid samples from a univariate normal distribution
# since the covariance matrix of the proposal distribution has zero
# for non-diagonal values
epsilon = np.random.normal(mu_prop, sigma_prop, size=2)
x_new = x[t-1,:] + epsilon
# for code clarity normal_pdf is recomputed at every iteration but it
# could be saved
p_accept = min(1.0, normal_pdf(x_new, mu, sigma) / normal_pdf(x[t-1,:], mu, sigma))
if np.random.random() < p_accept:
accepted_n += 1
x[t,:] = x_new
else:
x[t,:] = x[t-1,:]
print "The proportion of accepted values is", accepted_n*1.0/n
return x
def autocorrelation(x):
return np.corrcoef(x[1:],x[:-1])[0,1]
def ex5_3():
"""
Sampling from a bivariate normal distribution using Metropolis-Hasting.
"""
mu = np.array([0,0])
sigma = np.array([[4,1],[1,4]])
# sigma = np.array([[4,2.8],[2.8,4]])
n = 1000
x = mh_sampling(n, mu, sigma, sigma_prop=2.5)
pylab.figure(0)
pylab.plot(x[:,0],'b')
pylab.title("Sample path of X_1")
pylab.figure(1)
pylab.plot(x[:,1],'r')
pylab.title("Sample path of X_2")
# Plot the mean to show that it converges to mu
x1_cummean = np.cumsum(x[:,0]) / (1 + np.arange(n))
x2_cummean = np.cumsum(x[:,1]) / (1 + np.arange(n))
pylab.figure(2)
pylab.plot(x1_cummean, "b")
pylab.title("Empirical mean of X_1")
pylab.figure(3)
pylab.plot(x2_cummean, "r")
pylab.title("Empirical mean of X_2")
pylab.show()
# Autocorrelation
x1_sd = np.sqrt(np.var(x[:,0]))
x2_sd = np.sqrt(np.var(x[:,1]))
x1_autocorr = autocorrelation(x[:,0])
x2_autocorr = autocorrelation(x[:,1])
print "The autocorrelation of X_1 is", x1_autocorr
print "The autocorrelation of X_2 is", x2_autocorr
# Effective sample size
x1_ess = n * (1 - x1_autocorr) / (1 + x1_autocorr)
x2_ess = n * (1 - x2_autocorr) / (1 + x2_autocorr)
print "The effective sample size of X_1 is", x1_ess
print "The effective sample size of X_2 is", x2_ess
if __name__ == "__main__":
import sys
import __main__
getattr(__main__, "ex" + str(sys.argv[1]))()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.