Create a gist now

Instantly share code, notes, and snippets.

Simple parallelization using mpi4py

For basic applications, MPI is as easy to use as any other message-passing system. The sample scripts below contain the complete communications skeleton for a data (or embarrassingly) parallel problem using the mpi4py package.

Within the code is a description of the few functions needed to write typical parallel applications.

mpi-submit.py - Parallel application with simple partitioning: unbalanced load.

mpi-submit2.py - Parallel application with master/slave scheme: dynamically balanced load.

jobs.sh - Example of PBS script to run the parallel jobs on HPC.

All the code is available here: https://github.com/fspaolo/mpisubmit

Unbalanced parallelization (simple partitioning)

Script mpi-submit.py:

#!/usr/bin/env python
"""
Parallel application with simple partitioning (unbalanced load)

"""
# Fernando Paolo <fpaolo@ucsd.edu>
# Jan 15, 2013

import os
import sys
import numpy as np
from mpi4py import MPI


def simple_partitioning(length, num_procs):
    sublengths = [length/num_procs]*num_procs
    for i in range(length % num_procs):    # treatment of remainder
        sublengths[i] += 1
    return sublengths

def get_subproblem_input_args(input_args, my_rank, num_procs):
    sub_ns = simple_partitioning(len(input_args), num_procs)
    my_offset = sum(sub_ns[:my_rank])
    my_input_args = input_args[my_offset:my_offset+sub_ns[my_rank]]
    return my_input_args

def program_to_run(string):
    if '.py' in string:
        run = 'python '
    else:
        run = '' # './'
    return run


comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
num_procs = comm.Get_size()

prog_and_args = sys.argv[1]
files_in = sys.argv[2:]

run = program_to_run(prog_and_args)
my_files = get_subproblem_input_args(files_in, my_rank, num_procs)
os.system('%s%s %s' % (run, prog_and_args, ' '.join(my_files)))

print '%s%s %s' % (run, prog_and_args, ' '.join(my_files))

Balanced parallelization (master/slave)

Script mpi-submit2.py:

#!/usr/bin/env python
"""
For basic applications, MPI is as easy to use as any other 
message-passing system. The sample code below contains the complete 
communications skeleton for a dynamically load balanced master/slave 
application. Following the code is a description of the few functions 
necessary to write typical parallel applications.

important parameters
--------------------

status = MPI.Status()               # where all info is stored

# Receive results from a slave
result = comm.recv(                 # message buffer
         source=MPI.ANY_SOURCE,     # receive from any sender (-1)
         tag=MPI.ANY_TAG,           # any type of message (-1)
         status=status)             # info about the received msg (class)

# Send the slave a new work unit
comm.send(work,                     # message buffer
         dest=status.Get_source(),  # to whom we just received from
         tag=WORKTAG)               # user chosen message tag

"""
# Fernandoo Paolo <fpaolo@ucsd.edu>
# Jan 15, 2013

import os
import sys
import numpy as np
from mpi4py import MPI
from Queue import Queue

WORKTAG = 1
DIETAG = 0

class Work(object):
    def __init__(self, prog, files):
        # importat: sort by file size in decreasing order!
        files.sort(key=lambda f: os.stat(f).st_size, reverse=True)
        q = Queue()
        for f in files:
            q.put(' '.join([prog, f]))
        self.work = q

    def get_next(self):
        if self.work.empty():
            return None
        return self.work.get()
 

def do_work(work):
    if '.py' in work:
        os.system('python ' + work)
    else:
        os.system(work)  # for './'
    return


def process_result(result):
    pass


def master(comm):
    num_procs = comm.Get_size()
    status = MPI.Status()
    
    # generate work queue
    wq = Work(sys.argv[1], sys.argv[2:])

    # Seed the slaves, send one unit of work to each slave (rank)
    for rank in xrange(1, num_procs):
        work = wq.get_next()
        comm.send(work, dest=rank, tag=WORKTAG)
    
    # Loop over getting new work requests until there is no more work to be done
    while True:
        work = wq.get_next()
        if not work: break
    
        # Receive results from a slave
        result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        #process_result(result)

        # Send the slave a new work unit
        comm.send(work, dest=status.Get_source(), tag=WORKTAG)
    
    # No more work to be done, receive all outstanding results from slaves
    for rank in xrange(1, num_procs): 
        result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        #process_result(result)

    # Tell all the slaves to exit by sending an empty message with DIETAG
    for rank in xrange(1, num_procs):
        comm.send(0, dest=rank, tag=DIETAG)


def slave(comm):
    my_rank = comm.Get_rank()
    status = MPI.Status()

    while True:
        # Receive a message from the master
        work = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)

        # Check the tag of the received message
        if status.Get_tag() == DIETAG: break 

        # Do the work
        result = do_work(work)

        # Send the result back
        comm.send(result, dest=0, tag=0)
        

def main():
    comm = MPI.COMM_WORLD
    my_rank = comm.Get_rank()
    my_name = MPI.Get_processor_name()
    #comm.Barrier()
    #start = MPI.Wtime()
    
    if my_rank == 0:
        master(comm)
    else:
        slave(comm)
    
    #comm.Barrier()
    #end = MPI.Wtime()
    #print 'time:', end - start


if __name__ == '__main__':
    main()

Submit parallel application

Script jobs.sh:

#!/bin/bash

#PBS -A fpaolo             # account name
#PBS -q hotel              # queue name
#PBS -N test               # job name
#PBS -o test.out           # output file name
#PBS -e test.err           # error file name
#PBS -l nodes=1:ppn=2      # nodes, process-per-node
#PBS -l walltime=0:00:10   # requested time h:mm:ss
#PBS -M fpaolo@ucsd.edu    # email to receive messages
#PBS -m abe                # message when start and stop
#PBS -V                    # verbose

# print information
echo 'PBS_JOBID:' $PBS_JOBID
echo 'PBS_O_WORKDIR:' $PBS_O_WORKDIR
echo 'PBS_O_QUEUE:' $PBS_O_QUEUE
echo 'PBS_NODEFILE:' $PBS_NODEFILE

# Change to current workdir
cd $PBS_O_WORKDIR

# run programs
mpiexec -v -machinefile $PBS_NODEFILE python -c "print 'hello world'"

#mpiexec -v -machinefile $PBS_NODEFILE python mpi-submit.py '/path/to/prog.py' ~/path/to/data/file.*
#
# notes
# -----
# mpi-submit.py - distributes the workload => map(prog.py, [f1, f2, ...])
# prog.py - program to run on each file independently
# file.* - data files to process in parallel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment