Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?

Simple parallelization using mpi4py

For basic applications, MPI is as easy to use as any other message-passing system. The sample scripts below contain the complete communications skeleton for a data (or embarrassingly) parallel problem using the mpi4py package.

Within the code is a description of the few functions needed to write typical parallel applications.

mpi-submit.py - Parallel application with simple partitioning: unbalanced load.

mpi-submit2.py - Parallel application with master/slave scheme: dynamically balanced load.

jobs.sh - Example of PBS script to run the parallel jobs on HPC.

All the code is available here: https://github.com/fspaolo/mpisubmit

Unbalanced parallelization (simple partitioning)

Script mpi-submit.py:

#!/usr/bin/env python
"""
Parallel application with simple partitioning (unbalanced load)

"""
# Fernando Paolo <fpaolo@ucsd.edu>
# Jan 15, 2013

import os
import sys
import numpy as np
from mpi4py import MPI


def simple_partitioning(length, num_procs):
    sublengths = [length/num_procs]*num_procs
    for i in range(length % num_procs):    # treatment of remainder
        sublengths[i] += 1
    return sublengths

def get_subproblem_input_args(input_args, my_rank, num_procs):
    sub_ns = simple_partitioning(len(input_args), num_procs)
    my_offset = sum(sub_ns[:my_rank])
    my_input_args = input_args[my_offset:my_offset+sub_ns[my_rank]]
    return my_input_args

def program_to_run(string):
    if '.py' in string:
        run = 'python '
    else:
        run = '' # './'
    return run


comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
num_procs = comm.Get_size()

prog_and_args = sys.argv[1]
files_in = sys.argv[2:]

run = program_to_run(prog_and_args)
my_files = get_subproblem_input_args(files_in, my_rank, num_procs)
os.system('%s%s %s' % (run, prog_and_args, ' '.join(my_files)))

print '%s%s %s' % (run, prog_and_args, ' '.join(my_files))

Balanced parallelization (master/slave)

Script mpi-submit2.py:

#!/usr/bin/env python
"""
For basic applications, MPI is as easy to use as any other 
message-passing system. The sample code below contains the complete 
communications skeleton for a dynamically load balanced master/slave 
application. Following the code is a description of the few functions 
necessary to write typical parallel applications.

important parameters
--------------------

status = MPI.Status()               # where all info is stored

# Receive results from a slave
result = comm.recv(                 # message buffer
         source=MPI.ANY_SOURCE,     # receive from any sender (-1)
         tag=MPI.ANY_TAG,           # any type of message (-1)
         status=status)             # info about the received msg (class)

# Send the slave a new work unit
comm.send(work,                     # message buffer
         dest=status.Get_source(),  # to whom we just received from
         tag=WORKTAG)               # user chosen message tag

"""
# Fernandoo Paolo <fpaolo@ucsd.edu>
# Jan 15, 2013

import os
import sys
import numpy as np
from mpi4py import MPI
from Queue import Queue

WORKTAG = 1
DIETAG = 0

class Work(object):
    def __init__(self, prog, files):
        # importat: sort by file size in decreasing order!
        files.sort(key=lambda f: os.stat(f).st_size, reverse=True)
        q = Queue()
        for f in files:
            q.put(' '.join([prog, f]))
        self.work = q

    def get_next(self):
        if self.work.empty():
            return None
        return self.work.get()
 

def do_work(work):
    if '.py' in work:
        os.system('python ' + work)
    else:
        os.system(work)  # for './'
    return


def process_result(result):
    pass


def master(comm):
    num_procs = comm.Get_size()
    status = MPI.Status()
    
    # generate work queue
    wq = Work(sys.argv[1], sys.argv[2:])

    # Seed the slaves, send one unit of work to each slave (rank)
    for rank in xrange(1, num_procs):
        work = wq.get_next()
        comm.send(work, dest=rank, tag=WORKTAG)
    
    # Loop over getting new work requests until there is no more work to be done
    while True:
        work = wq.get_next()
        if not work: break
    
        # Receive results from a slave
        result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        #process_result(result)

        # Send the slave a new work unit
        comm.send(work, dest=status.Get_source(), tag=WORKTAG)
    
    # No more work to be done, receive all outstanding results from slaves
    for rank in xrange(1, num_procs): 
        result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        #process_result(result)

    # Tell all the slaves to exit by sending an empty message with DIETAG
    for rank in xrange(1, num_procs):
        comm.send(0, dest=rank, tag=DIETAG)


def slave(comm):
    my_rank = comm.Get_rank()
    status = MPI.Status()

    while True:
        # Receive a message from the master
        work = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)

        # Check the tag of the received message
        if status.Get_tag() == DIETAG: break 

        # Do the work
        result = do_work(work)

        # Send the result back
        comm.send(result, dest=0, tag=0)
        

def main():
    comm = MPI.COMM_WORLD
    my_rank = comm.Get_rank()
    my_name = MPI.Get_processor_name()
    #comm.Barrier()
    #start = MPI.Wtime()
    
    if my_rank == 0:
        master(comm)
    else:
        slave(comm)
    
    #comm.Barrier()
    #end = MPI.Wtime()
    #print 'time:', end - start


if __name__ == '__main__':
    main()

Submit parallel application

Script jobs.sh:

#!/bin/bash

#PBS -A fpaolo             # account name
#PBS -q hotel              # queue name
#PBS -N test               # job name
#PBS -o test.out           # output file name
#PBS -e test.err           # error file name
#PBS -l nodes=1:ppn=2      # nodes, process-per-node
#PBS -l walltime=0:00:10   # requested time h:mm:ss
#PBS -M fpaolo@ucsd.edu    # email to receive messages
#PBS -m abe                # message when start and stop
#PBS -V                    # verbose

# print information
echo 'PBS_JOBID:' $PBS_JOBID
echo 'PBS_O_WORKDIR:' $PBS_O_WORKDIR
echo 'PBS_O_QUEUE:' $PBS_O_QUEUE
echo 'PBS_NODEFILE:' $PBS_NODEFILE

# Change to current workdir
cd $PBS_O_WORKDIR

# run programs
mpiexec -v -machinefile $PBS_NODEFILE python -c "print 'hello world'"

#mpiexec -v -machinefile $PBS_NODEFILE python mpi-submit.py '/path/to/prog.py' ~/path/to/data/file.*
#
# notes
# -----
# mpi-submit.py - distributes the workload => map(prog.py, [f1, f2, ...])
# prog.py - program to run on each file independently
# file.* - data files to process in parallel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment