Skip to content

Instantly share code, notes, and snippets.

@sixy6e
Last active March 4, 2024 13:08
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sixy6e/ed35ea88ba0627e0f7dfdf115a3bf4d1 to your computer and use it in GitHub Desktop.
Save sixy6e/ed35ea88ba0627e0f7dfdf115a3bf4d1 to your computer and use it in GitHub Desktop.
mpi-logging-example

A simple MPI logging and compute example

The simple logging example can be run via:

  • mpiexec -n 4 python mpi_logger.py

For the compute and logging example:

  • mpiexec -n 4 python mock_execution.py

The mpi scheduler is provided by:

#!/usr/bin/env python
from os import makedirs
from os.path import exists
import logging
from mpi4py import MPI
import schwimmbad
import structlog
from mpi_logger import MPIFileHandler, COMMON_PROCESSORS
from task import SomeTask
# configure structlog to log in a specific way
structlog.configure(
processors=COMMON_PROCESSORS,
logger_factory=structlog.stdlib.LoggerFactory()
)
# I/O handler
HANDLER = MPIFileHandler("execution.log")
FORMATTER = logging.Formatter('%(message)s')
HANDLER.setFormatter(FORMATTER)
# initialise a logger
LOGGER = logging.getLogger('status')
LOGGER.setLevel(logging.DEBUG)
LOGGER.addHandler(HANDLER)
def main():
"""
Simulate the execution of a bunch of tasks using MPI,
as well as setup MPI logging.
We should see all processor ranks logging similar events right up
till `pool = schwimmbad.choose_pool(mpi=True)` is called.
After which only rank 0 will be logging events, and executing lines
of code.
All tasks should contain the same value for rank as an argument,
and the value will be zero.
The tasks themeselves will be carried out by the workers,
ranks 1 and higher, and rank 0 will act as a scheduler.
"""
# comm and processor info
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# bind the logs to include the rank and size
log = structlog.get_logger('status').bind(rank=rank, size=size)
log.info('checking rank and size')
# simpler mechanism the tells slaves to wait for the master's instructions
log.info('about to initialise an MPI pool')
pool = schwimmbad.choose_pool(mpi=True)
# after pool is created, only rank 0 will be executing the following lines
log.info('MPI pool chosen')
# change as required
outdir = 'outdir'
bands = range(1, 21)
# the task class that calls some function
task = SomeTask()
log.info('task chosen')
# define the list of tasks and the input parameters
args = [(outdir, band, rank) for band in bands]
# the rank arg should always be zero (remember we defined the MPI pool)
log.info(args=args)
if not exists(outdir):
log.info('create output directory')
makedirs(outdir)
# map tasks across the pool of workers
# rank 0 acts only as a scheduler, ranks 1 and higher are workers
pool.map(task, args)
pool.close()
log.info('finished processing')
if __name__ == "__main__":
main()
#!/usr/bin/env python
"""
Logging configuration for JSON Lines structured logging.
Defines structured logging for:
* Errors -- qualname error
* Status messages -- qualname status
"""
import logging
from mpi4py import MPI
import structlog
from structlog.processors import JSONRenderer
COMMON_PROCESSORS = [
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="ISO"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
JSONRenderer(sort_keys=True)
]
def get_wrapped_logger(logger_name: str = 'root', **kwargs):
""" Returns a struct log equivalent for the named logger """
return structlog.wrap_logger(
logging.getLogger(logger_name),
COMMON_PROCESSORS,
**kwargs
)
class FormatJSONL(logging.Formatter):
""" Prevents printing of the stack trace to enable JSON lines output """
def formatException(self, ei):
""" Disables printing separate stack traces """
return
ERROR_LOGGER = get_wrapped_logger('error', stack_info=True)
STATUS_LOGGER = get_wrapped_logger('status')
class MPIIOStream(object):
"""
A very basic MPI stream handler for synchronised I/O.
"""
def __init__(self, filename, comm, mode):
self._file = MPI.File.Open(comm, filename, mode)
self._file.Set_atomicity(True)
def write(self, msg):
# if for some reason we don't have a unicode string...
try:
msg = msg.encode()
except AttributeError:
pass
self._file.Write_shared(msg)
def sync(self):
"""
Synchronise the processes
"""
self._file.Sync()
def close(self):
self.sync()
self._file.Close()
class MPIFileHandler(logging.StreamHandler):
"""
A basic MPI file handler for writing log files.
Internally opens a synchronised MPI I/O stream via MPIIOStream.
Ideas and some code from:
* https://groups.google.com/forum/#!topic/mpi4py/SaNzc8bdj6U
* https://gist.github.com/JohnCEarls/8172807
* https://stackoverflow.com/questions/45680050/cannot-write-to-shared-mpi-file-with-mpi4py
"""
def __init__(self, filename,
mode=MPI.MODE_WRONLY|MPI.MODE_CREATE, comm=MPI.COMM_WORLD):
self.filename = filename
self.mode = mode
self.comm = comm
super(MPIFileHandler, self).__init__(self._open())
def _open(self):
stream = MPIIOStream(self.filename, self.comm, self.mode)
return stream
def close(self):
if self.stream:
self.stream.close()
self.stream = None
def emit(self, record):
"""
Emit a record.
We have to override emit, as the logging.StreamHandler has 2 calls
to 'write'. The first for the message, and the second for the
terminator. This posed a problem for mpi, where a second process
could call 'write' in between these two calls and create a
conjoined log message.
"""
msg = self.format(record)
self.stream.write('{}{}'.format(msg, self.terminator))
self.flush()
def main():
"""
A sample test run.
"""
comm = MPI.COMM_WORLD
logger = logging.getLogger("node[%i]"%comm.rank)
# logger = logging.getLogger("func-status") # another name example
logger.setLevel(logging.DEBUG)
mpi_handler = MPIFileHandler("test.log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
mpi_handler.setFormatter(formatter)
logger.addHandler(mpi_handler)
# sample log levels
logger.debug('debug message')
logger.info('info message')
logger.warn('warn message')
logger.error('error message')
logger.critical('critical message')
if __name__ == "__main__":
main()
#!/usr/bin/env python
from os.path import join as pjoin
import numpy
import h5py
from mpi4py import MPI
import structlog
from mpi_logger import COMMON_PROCESSORS
# log file i/o is initialised elsewhere
LOG = structlog.get_logger('status')
# which rank worker will do the work (should always be 1 or higher)
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
class SomeTask(object):
"""
A simple class definition (similar to luigi) that defines
a work method for custom functions.
"""
def work(self, outdir, band, rank):
# rank2 will be the process executing the work
# but the arg 'rank' will always be zero, as rank 0 defined the work
log = LOG.bind(rank=rank, rank2=RANK)
log.info("start processing band: {}".format(band))
out_fname = pjoin(outdir, 'band-{}.h5'.format(band))
with h5py.File(out_fname, 'w') as fid:
data = numpy.random.randint(0, 10001, (4000, 4000))
kwargs = {'data': data,
'compression': 'lzf',
'chunks': (1, 4000),
'shuffle': True}
fid.create_dataset('data', **kwargs)
log.info("finished processing band: {}".format(band))
def __call__(self, args):
self.work(*args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment