Created
August 30, 2011 19:30
-
-
Save DougBurke/1181790 to your computer and use it in GitHub Desktop.
Simple, lightly-tested, parallel pool implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright (C) 2011 Smithsonian Astrophysical Observatory | |
# | |
# | |
# This program is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation; either version 2 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License along | |
# with this program; if not, write to the Free Software Foundation, Inc., | |
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
# | |
""" | |
Test routines for taking advantage of multiple processors. | |
This is a slightly different approach to that provided by | |
sherpa.utils.parallel_map and is provided for testing. Please | |
be aware that at present this code has seen limited testing and | |
has no guarantee of stability. | |
Sherpa's parallel_map splits up the work load at creation time and | |
farms off a set of tasks to each processor. Here we use a pool of jobs, | |
with each worker extracting the next job when it is free. | |
See, amongst others: | |
http://docs.python.org/library/multiprocessing.html | |
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/ | |
""" | |
import sys | |
import os | |
import time | |
import signal | |
import Queue | |
import multiprocessing | |
# from ciao_contrib.logger_wrapper import initialize_module_logger | |
__all__ = ("parallel_pool", ) | |
# logger = initialize_module_logger("parallel_wrapper") | |
# v3 = logger.verbose3 | |
# v4 = logger.verbose4 | |
# v5 = logger.verbose4 | |
def task(func, arg_queue, result_queue): | |
"""Remove a task from the arg_queue (ie the next argument to use) | |
and call func. Store the result in result_queue. | |
Repeat until arg_queue is empty. | |
""" | |
# note we block control-c handling here | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
while not arg_queue.empty(): | |
try: | |
(i, arg) = arg_queue.get(block=False) | |
# v5("# Parallel worker starting task #{0}".format(i+1)) | |
ans = func(arg) | |
result_queue.put((i, ans)) | |
except Queue.Empty: | |
pass | |
def parallel_pool(func, args, ncores=None): | |
"""Process func in parallel, once for each argument in args. | |
func takes a single parameter, so you will normally need to write | |
a wrapper routine something like the following, and use it as an | |
argument to parallel_pool. | |
def wrapper(arg): | |
actual_func(*arg) | |
One issue with this design is that it does not support keyword | |
arguments. | |
If ncores is None then uses multiprocessing.cpu_count(). | |
The return value is an array of the return values of func, | |
in the order of the args array. | |
""" | |
if ncores == None: | |
nc = multiprocessing.cpu_count() | |
else: | |
nc = ncores | |
narg = len(args) | |
if nc > narg: | |
nc = narg | |
# v3("# Parallel processing: {0} processes with {1} cores".format(narg, nc)) | |
job_queue = multiprocessing.Queue() | |
result_queue = multiprocessing.Queue() | |
for i in range(narg): | |
job_queue.put((i, args[i])) | |
stime = time.localtime() | |
# v4("# Parallel start time: {0}".format(time.asctime(stime))) | |
workers = [] | |
for i in range(nc): | |
# v5("# Starting parallel worker: {0}".format(i+1)) | |
w = multiprocessing.Process(target=task, args=(func, job_queue, result_queue)) | |
w.start() | |
workers.append(w) | |
try: | |
for w in workers: | |
# v5("# Joining worker to parallel queue") | |
w.join() | |
except KeyboardInterrupt: | |
# v5("# Parallel execution aborted by control c") | |
for w in workers: | |
w.terminate() | |
w.join() | |
etime = time.localtime() | |
# v4("# Parallel end time: {0}".format(time.asctime(etime))) | |
dtime = time.mktime(etime) - time.mktime(stime) | |
# v3("# Parallel run took: {0} seconds".format(dtime)) | |
out = [None] * narg | |
while not result_queue.empty(): | |
(n,v) = result_queue.get(block=False) | |
out[n] = v | |
return out | |
# End |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment