Created
January 29, 2013 22:47
-
-
Save cgranade/4668712 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env mpiexec -n 4 python | |
# -*- coding: utf-8 -*- | |
## | |
# ssp.py: Demonstration of stupid simple pooling using mpi4py. | |
## | |
# By Christopher E. Granade, 2013. | |
# Commited to the public domain except where otherwise noted. | |
## | |
## IMPORTS ##################################################################### | |
from mpi4py import MPI | |
import time | |
import sys | |
import random | |
import itertools | |
import numpy | |
## CONSTANTS ################################################################### | |
COMM = MPI.COMM_WORLD | |
RANK = COMM.Get_rank() | |
SIZE = COMM.Get_size() | |
## FUNCTIONS ################################################################### | |
def enum(*sequential, **named): | |
""" | |
(c) 2013 StackExchange. | |
Licensed under Creative Commons BY-SA 3.0: | |
http://creativecommons.org/licenses/by-sa/3.0/ | |
This module is based on the answer at | |
http://stackoverflow.com/a/1695250 | |
by Alec Thomas. | |
""" | |
enums = dict(zip(sequential, range(len(sequential))), **named) | |
reverse = dict((value, key) for key, value in enums.iteritems()) | |
enums['reverse_mapping'] = reverse | |
return type('Enum', (), enums) | |
# Example server and client. | |
def server(): | |
tasks = "When will all of these words get printed?".split(" ") | |
pool = itertools.chain.from_iterable(itertools.repeat(xrange(1, SIZE))) | |
for task in tasks: | |
data = numpy.empty(1, dtype='i') | |
data[0] = Control.NEW | |
for worker in pool: | |
req = COMM.Isend([data, MPI.INT], dest=worker, tag=Tags.CONTROL) | |
time.sleep(0.05) # Wait for a response for 50ms. | |
if not req.Test(): | |
# Guess that worker was busy... | |
req.Cancel() | |
else: | |
# The task has been accepted. Yay! | |
COMM.send(task, dest=worker, tag=Tags.TASK) | |
# Now to break out of the worker search and to go onto | |
# the next task. | |
break | |
# We're out of tasks, so tell each worker that. | |
for worker in xrange(1, SIZE): | |
data = numpy.empty(1, dtype='i') | |
data[0] = Control.DONE | |
COMM.Isend([data, MPI.INT], dest=worker, tag=Tags.CONTROL) | |
def client(): | |
while True: | |
# Check for control; that should always be an MPI int, | |
# so we use the upper-case method. | |
data = numpy.empty(1, dtype='i') | |
COMM.Recv([data, MPI.INT], source=0, tag=Tags.CONTROL) | |
instruction = data[0] | |
# Now we handle the instruction. | |
if instruction == Control.NEW: | |
# New task coming. For this example, a "task" | |
# is to sleep for a random time, then print a Python object. | |
task = COMM.recv(source=0, tag=Tags.TASK) | |
time.sleep(random.random()) | |
sys.stdout.write("{rank}\t{task}\n".format( | |
rank=RANK, task=task | |
)) | |
elif instruction == Control.DONE: | |
# We are being told to exit, so break out of the loop. | |
break | |
else: | |
raise ValueError( | |
"Control instruction {} not recognized!".format(instruction) | |
) | |
## ENUMS ####################################################################### | |
Tags = enum("CONTROL", "TASK") | |
Control = enum("NEW", "DONE") | |
## MAIN PROGRAM #################333############################################ | |
if __name__ == "__main__": | |
if RANK == 0: | |
server() | |
else: | |
client() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment