Skip to content

Instantly share code, notes, and snippets.

@adrn
Last active September 25, 2023 06:01
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adrn/5126141 to your computer and use it in GitHub Desktop.
Save adrn/5126141 to your computer and use it in GitHub Desktop.
run emcee with MPI on Hotfoot
"""
- Hotfoot is Columbia's shared Astronomy-Statistics supercomputing
cluster.
- Emcee is a kick-ass MCMC black-box for sampling probability
distributions & parameter estimation.
- MPI is a standardized message passing system designed for supporting
and running parallel code.
Emcee is an implementation of an affine-invariant, ensemble sampler --
key words here being *ensemple sampler*. The algorithm runs an ensemble
of 'walkers' -- each independent MCMC chains. Sounds parallel to me!
By default, emcee will let you distribute these walkers over multiple
processors using Python's multiprocessing package. This doesn't work
so well for big clusters because of shared memory. Instead, we can use
MPI to truly parallelize the sampler!
Read more about these things here:
https://wikis.cuit.columbia.edu/confluence/display/rcs/Hotfoot+HPC+Cluster+User+Documentation
http://dan.iel.fm/emcee/
http://en.wikipedia.org/wiki/Message_Passing_Interface
And about MPI + emcee:
http://dan.iel.fm/emcee/user/advanced/#using-mpi-to-distribute-the-computations
"""
import numpy as np
import emcee
from emcee.utils import MPIPool
def lnprob(x, ivar):
""" This is just a multi-dimensional Gaussian, but I've added a little
hack to simulate doing some calculation that takes a longer amount
of time. Without that, the calculation is so fast that the bottle
neck comes in setting up the MPI threads so it would actually take
*longer* to run using MPI...
"""
np.sum(np.arange(int(1E7)))
return -0.5 * np.sum(ivar * x ** 2)
ndim, nwalkers = 10, 20
ivar = 1. / np.random.rand(ndim)
p0 = [np.random.rand(ndim) for i in range(nwalkers)]
# Initialize the MPI pool
pool = MPIPool()
# Make sure the thread we're running on is the master...copy and paste this
if not pool.is_master():
pool.wait()
sys.exit(0)
sampler = emcee.EnsembleSampler(nwalkers, ndim, lnprob,
args=(ivar,), pool=pool)
sampler.run_mcmc(p0, 100)
pool.close()
sys.exit(0)
"""
That's it for the Python code! Now how do we start the job? We use
the command 'mpirun' to start the python script!
test_mpi.sh
----------------------------------------------------------------------
#!/bin/sh
# Directives
#PBS -N test_mpi
#PBS -W group_list=hpcastro
#PBS -l nodes=4,walltime=01:00:00,mem=8gb
#PBS -M <your email here>
#PBS -m abe
#PBS -V
# Set output and error directories
#PBS -o localhost:/path/to/output/dir
#PBS -e localhost:/path/to/output/dir
#Command to execute Python program with MPI
mpirun -n 16 /path/to/emcee_mpi_hotfoot.py
#End of script
----------------------------------------------------------------------
"""
@JaberUA
Copy link

JaberUA commented Sep 25, 2023

x0 = [0.32,-6.2,0.72,-19.4]

ndim, nwalkers = 4, 10
Transition = lambda x: [np.random.normal(x[0], 0.1), np.random.normal(x[1], 0.01),np.random.normal(x[2], 0.1), np.random.normal(x[3], 0.01)]
pos = [np.array(Transition(x0)) for i in range(nwalkers)]
filename = 'Angular_BAO.h5'
backend = emcee.backends.HDFBackend(filename)
backend.reset(nwalkers, ndim)

#class emcee:
#num_threads=14
#with ThreadPoolExecutor(max_workers=num_threads) as pool:
with Pool(10) as pool:

print("chi2 trial",chi2(0.3,-6.3,0.7,-19.2))

sampler = emcee.EnsembleSampler(nwalkers, ndim, lnp, backend=backend, pool=pool)
start = time.time()
#run the MCMC:

sampler.run_mcmc(pos, 5, progress=True)
end = time.time()
multi_time = end - start
print("Multiprocessing took {0:.1f} seconds".format(multi_time))

#print("{0:.1f} times faster than serial".format(serial_time / multi_time))

print pos

Clear and run the production chain.

print("Running MCMC...")
#sampler.run_mcmc(pos, 1500)
print("Done.")

Ploting the Posterior Distributions

########################################################################################################################################################################

burn = 0

params = sampler.chain[:, burn:, :].reshape((-1, ndim))

Save the parameter chains to a file

np.savetxt('scalar-all.dat', np.array(params))
This is my code and I have tried to parallelize it but it runs only in one core after some time. Why? Can you help me here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment