Skip to content

Instantly share code, notes, and snippets.

@philastrophist
Last active June 10, 2021 15:26
Show Gist options
  • Save philastrophist/619fb7045d760212a06c95f811effb21 to your computer and use it in GitHub Desktop.
Save philastrophist/619fb7045d760212a06c95f811effb21 to your computer and use it in GitHub Desktop.
import emcee
from sklearn.mixture import GaussianMixture
import vegas
from astropy.cosmology import Planck15
import numpy as np
from scipy import stats
from astropy import units as u
import matplotlib.pyplot as plt
jy_factor = (u.W / u.Hz / u.m / u.m).to(u.Jy)
whz_factor = (u.Jy * u.m * u.m).to(u.W / u.Hz)
class Integrator:
def __init__(self, logm_observed, logm_error, f_observed, f_error, z_observed, z_error,
mu, V,
logl_range, logm_range,
z_range=(0.0001, 1.), zpoints=1000, n_adapt_samples=1000,
zdeg=2, cosmo=Planck15, alpha=-0.71):
self.f_observed = f_observed
self.f_error = f_error
self.z_observed = z_observed
self.z_error = z_error
self.logm_observed = logm_observed
self.logm_error = logm_error
self.mu = mu
self.V = V
self.gaussian_component = stats.multivariate_normal(mu, V)
if z_range[0] <= 0:
raise ValueError(f"minimum z cannot be zero")
self.z_range = z_range
self.alpha = alpha
_zs = np.linspace(*z_range, num=zpoints)
_dls = cosmo.luminosity_distance(_zs).to(u.m).value
self.dl_params = np.polyfit(_zs, _dls, zdeg)
self.logm_range = logm_range
self.logl_range = logl_range
self.adaptive_map = vegas.AdaptiveMap([logm_range, logl_range, z_range]) # uniform map
self.logm_dist = stats.norm(self.logm_observed, self.logm_error)
self.f_dist = stats.norm(self.f_observed, self.f_error)
self.z_dist = stats.norm(self.z_observed, self.z_error)
self._samples = self.generate_map_samples(n_adapt_samples)
def logdl(self, z):
"""luminosity_distance in metres"""
return np.log10(np.polyval(self.dl_params, z))
def logl_to_f(self, logl, z):
"""log[W/Hz] -> Jy (10^-26W/m2/Hz)
W/Hz/m^2
"""
return (10**(logl - np.log10(4 * np.pi) - (2 * self.logdl(z)) + ((1 + self.alpha) * np.log10(1 + z)))) * jy_factor
def f_to_logl(self, f, z):
return np.log10(f * whz_factor * 4 * np.pi) + (2 * self.logdl(z)) - ((1 + self.alpha) * np.log10(1 + z))
def invdetjacobian(self, f, z):
"""
The jacobian for the transform `P(f,z | vL, vz) -> P(log[L], z | vL, vz)` is
[[1/f, dL/dz],
[0, 1]]
1 / abs(determinant) ~ f
"""
return f / np.log(10) # since we're using log10
def logprob_lz(self, true_logl, true_z):
true_f = self.logl_to_f(true_logl, true_z)
return self.f_dist.logpdf(true_f) + np.log(self.invdetjacobian(true_f, true_z))
def logprob_observed_given_true(self, true_m, true_logl, true_z):
logl_part = self.logprob_lz(true_logl, true_z)
z_part = self.z_dist.logpdf(true_z)
m_part = self.logm_dist.logpdf(true_m)
return logl_part + z_part + m_part
def logprob_gaussian_component(self, true_mlogl):
return self.gaussian_component.logpdf(true_mlogl)
def logintegrand(self, x):
r = np.where(x[:, 2] >= 0, self.logprob_observed_given_true(x[:, 0], x[:, 1], x[:, 2]) + \
self.logprob_gaussian_component(x[:, :2]), -np.inf)
r[~np.isfinite(r)] = -np.inf
return r
def sample_true_given_observed(self, n):
zs = self.z_dist.rvs(n)
fs = self.f_dist.rvs(n)
logms = self.logm_dist.rvs(n)
logls = self.f_to_logl(fs, zs)
r = np.stack([logms, logls, zs]).T
return r[np.isfinite(r).all(axis=1)]
def generate_map_samples(self, ninit, nwalkers=100, chain=200, burn=100):
comp_samples = np.concatenate([self.gaussian_component.rvs(ninit), self.z_dist.rvs((ninit, 1))], axis=1)
samples = np.concatenate([self.sample_true_given_observed(ninit), comp_samples])
gmm = GaussianMixture(1).fit(samples)
samples = np.concatenate([samples, gmm.sample(ninit)[0]])
ndim = 3
p0 = samples.reshape(-1, ndim)[:nwalkers, :]
nwalkers = p0.shape[0]
sampler = emcee.EnsembleSampler(nwalkers, ndim, self.logintegrand, vectorize=True)
sampler.run_mcmc(p0, chain)
return sampler.chain[:, burn:].reshape(-1, ndim)
def integrate(self, neval=1e5, nitn=10, nadapt=10, alpha=0.1):
x, y = self._samples, self.logintegrand(self._samples)
adjustment = -np.mean(y)
y = np.exp(y + adjustment)
self.adaptive_map.adapt_to_samples(x, y, nitn=nadapt) # precondition map
integ = vegas.Integrator(self.adaptive_map, alpha=alpha)
func = vegas.batchintegrand(lambda x: np.exp(self.logintegrand(x) + adjustment))
return integ(func, neval=neval, nitn=nitn), np.exp(adjustment)
def plot_result(self, r):
plt.figure()
plt.errorbar(np.arange(len(r.itn_results)), *zip(*[(i.mean, i.sdev) for i in r.itn_results]))
plt.axhspan(r.mean - r.sdev, r.mean + r.sdev, alpha=0.2)
def plot_plane(self, log=False, slices=(slice(8, 12, 0.05), slice(19,22,0.05))):
plt.figure()
grid = np.mgrid[slices[0], slices[1], self.z_observed:self.z_observed+0.001:0.001][..., :1]
r = self.logintegrand(grid.reshape(grid.shape[0], -1).T).T.reshape(grid.shape[1:])
if not log:
r = np.exp(r)
plt.pcolormesh(grid[0, ..., 0], grid[1, ..., 0], r[..., 0])
plt.colorbar()
m, l = self.sample_true_given_observed(10000)[:, :2].T
h, xedges, yedges = np.histogram2d(m, l, bins=10)
xbins = xedges[:-1] + (xedges[1] - xedges[0]) / 2
ybins = yedges[:-1] + (yedges[1] - yedges[0]) / 2
h = h.T
plt.contour(xbins, ybins, h, levels=10, cmap='Reds')
gaussian_prob = np.exp(self.logprob_gaussian_component(grid[:2, ..., 0].reshape(2, -1).T).T.reshape(grid.shape[1:-1]))
plt.contour(grid[0, ..., 0], grid[1, ..., 0], gaussian_prob, cmap='Greys')
plt.xlabel('logM')
plt.ylabel('logL')
plt.xlim(slices[0].start, slices[0].stop)
plt.ylim(slices[1].start, slices[1].stop)
def plot_luminosity_distribution(self, z=None):
plt.figure()
z = self.z_observed if z is None else z
ls = np.linspace(*self.logl_range, num=1000)
lnp = self.logprob_lz(ls, z)
plt.plot(ls, np.exp(lnp))
plt.xlabel('logL')
if __name__ == '__main__':
mu = [11, 20.8]
cov = np.eye(2) * 0.01
f, ferr, z, zerr = 1e-4, 1e-3, 0.085, 0.0001
m, merr = 11, 0.1
# logprobs = []
# ferrs = [1e-5, 1e-4, 1e-3, 1e-2, 1e-1]
# for ferr in ferrs:
# integrator = Integrator(m, merr, f, ferr, z, zerr, mu, cov, [0, 50], [0, 50])
# r, adjustment = integrator.integrate()
# logprobs.append(np.log(r.val) - adjustment)
# plt.plot(ferrs, logprobs)
integrator = Integrator(m, merr, f, ferr, z, zerr, mu, cov, [0, 50], [0, 50])
r, adjustment = integrator.integrate()
# print(f'result = {r / np.exp(adjustment)} adjustment=exp({adjustment})')
# print(r.summary())
print(np.log(r) - adjustment)
integrator.plot_plane()
integrator.plot_luminosity_distribution()
integrator.plot_result(r)
plt.show()
@philastrophist
Copy link
Author

Added sampler to get better initial adaption samples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment