Skip to content

Instantly share code, notes, and snippets.

@cgranade
Created January 29, 2013 22:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cgranade/4668712 to your computer and use it in GitHub Desktop.
Save cgranade/4668712 to your computer and use it in GitHub Desktop.
#!/usr/bin/env mpiexec -n 4 python
# -*- coding: utf-8 -*-
##
# ssp.py: Demonstration of stupid simple pooling using mpi4py.
##
# By Christopher E. Granade, 2013.
# Commited to the public domain except where otherwise noted.
##
## IMPORTS #####################################################################
from mpi4py import MPI
import time
import sys
import random
import itertools
import numpy
## CONSTANTS ###################################################################
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
SIZE = COMM.Get_size()
## FUNCTIONS ###################################################################
def enum(*sequential, **named):
"""
(c) 2013 StackExchange.
Licensed under Creative Commons BY-SA 3.0:
http://creativecommons.org/licenses/by-sa/3.0/
This module is based on the answer at
http://stackoverflow.com/a/1695250
by Alec Thomas.
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
reverse = dict((value, key) for key, value in enums.iteritems())
enums['reverse_mapping'] = reverse
return type('Enum', (), enums)
# Example server and client.
def server():
tasks = "When will all of these words get printed?".split(" ")
pool = itertools.chain.from_iterable(itertools.repeat(xrange(1, SIZE)))
for task in tasks:
data = numpy.empty(1, dtype='i')
data[0] = Control.NEW
for worker in pool:
req = COMM.Isend([data, MPI.INT], dest=worker, tag=Tags.CONTROL)
time.sleep(0.05) # Wait for a response for 50ms.
if not req.Test():
# Guess that worker was busy...
req.Cancel()
else:
# The task has been accepted. Yay!
COMM.send(task, dest=worker, tag=Tags.TASK)
# Now to break out of the worker search and to go onto
# the next task.
break
# We're out of tasks, so tell each worker that.
for worker in xrange(1, SIZE):
data = numpy.empty(1, dtype='i')
data[0] = Control.DONE
COMM.Isend([data, MPI.INT], dest=worker, tag=Tags.CONTROL)
def client():
while True:
# Check for control; that should always be an MPI int,
# so we use the upper-case method.
data = numpy.empty(1, dtype='i')
COMM.Recv([data, MPI.INT], source=0, tag=Tags.CONTROL)
instruction = data[0]
# Now we handle the instruction.
if instruction == Control.NEW:
# New task coming. For this example, a "task"
# is to sleep for a random time, then print a Python object.
task = COMM.recv(source=0, tag=Tags.TASK)
time.sleep(random.random())
sys.stdout.write("{rank}\t{task}\n".format(
rank=RANK, task=task
))
elif instruction == Control.DONE:
# We are being told to exit, so break out of the loop.
break
else:
raise ValueError(
"Control instruction {} not recognized!".format(instruction)
)
## ENUMS #######################################################################
Tags = enum("CONTROL", "TASK")
Control = enum("NEW", "DONE")
## MAIN PROGRAM #################333############################################
if __name__ == "__main__":
if RANK == 0:
server()
else:
client()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment