Created
April 24, 2013 23:36
-
-
Save andresriancho/5456482 to your computer and use it in GitHub Desktop.
My modified pool.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Module providing the `Pool` class for managing a process pool | |
# | |
# multiprocessing/pool.py | |
# | |
# Copyright (c) 2006-2008, R Oudkerk | |
# All rights reserved. | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions | |
# are met: | |
# | |
# 1. Redistributions of source code must retain the above copyright | |
# notice, this list of conditions and the following disclaimer. | |
# 2. Redistributions in binary form must reproduce the above copyright | |
# notice, this list of conditions and the following disclaimer in the | |
# documentation and/or other materials provided with the distribution. | |
# 3. Neither the name of author nor the names of any contributors may be | |
# used to endorse or promote products derived from this software | |
# without specific prior written permission. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND | |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE | |
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | |
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | |
# SUCH DAMAGE. | |
# | |
__all__ = ['Pool'] | |
# | |
# Imports | |
# | |
import threading | |
import Queue | |
import itertools | |
import collections | |
import time | |
import sys | |
import traceback | |
from multiprocessing import Process, cpu_count, TimeoutError | |
from multiprocessing.util import Finalize, debug | |
# | |
# Constants representing the state of a pool | |
# | |
RUN = 0 | |
CLOSE = 1 | |
TERMINATE = 2 | |
# | |
# Miscellaneous | |
# | |
job_counter = itertools.count() | |
def mapstar(args): | |
return map(*args) | |
# | |
# Code run by worker processes | |
# | |
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): | |
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) | |
put = outqueue.put | |
get = inqueue.get | |
if hasattr(inqueue, '_writer'): | |
inqueue._writer.close() | |
outqueue._reader.close() | |
if initializer is not None: | |
initializer(*initargs) | |
completed = 0 | |
while maxtasks is None or (maxtasks and completed < maxtasks): | |
try: | |
task = get() | |
except (EOFError, IOError): | |
debug('worker got EOFError or IOError -- exiting') | |
break | |
if task is None: | |
debug('worker got sentinel -- exiting') | |
break | |
job, i, func, args, kwds = task | |
try: | |
result = (True, func(*args, **kwds)) | |
except Exception, e: | |
exc_info = sys.exc_info() | |
tb_string = traceback.format_exc(exc_info[2]) | |
result = (False, (e, tb_string)) | |
put((job, i, result)) | |
completed += 1 | |
debug('worker exiting after %d tasks' % completed) | |
# | |
# Class representing a process pool | |
# | |
class Pool(object): | |
''' | |
Class which supports an async version of the `apply()` builtin | |
''' | |
Process = Process | |
def __init__(self, processes=None, initializer=None, initargs=(), | |
maxtasksperchild=None): | |
self._setup_queues() | |
self._taskqueue = Queue.Queue() | |
self._cache = {} | |
self._state = RUN | |
self._maxtasksperchild = maxtasksperchild | |
self._initializer = initializer | |
self._initargs = initargs | |
if processes is None: | |
try: | |
processes = cpu_count() | |
except NotImplementedError: | |
processes = 1 | |
if processes < 1: | |
raise ValueError("Number of processes must be at least 1") | |
if initializer is not None and not hasattr(initializer, '__call__'): | |
raise TypeError('initializer must be a callable') | |
self._processes = processes | |
self._pool = [] | |
self._repopulate_pool() | |
self._worker_handler = threading.Thread( | |
target=Pool._handle_workers, | |
args=(self, ) | |
) | |
self._worker_handler.daemon = True | |
self._worker_handler._state = RUN | |
self._worker_handler.start() | |
self._task_handler = threading.Thread( | |
target=Pool._handle_tasks, | |
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) | |
) | |
self._task_handler.daemon = True | |
self._task_handler._state = RUN | |
self._task_handler.start() | |
self._result_handler = threading.Thread( | |
target=Pool._handle_results, | |
args=(self._outqueue, self._quick_get, self._cache) | |
) | |
self._result_handler.daemon = True | |
self._result_handler._state = RUN | |
self._result_handler.start() | |
self._terminate = Finalize( | |
self, self._terminate_pool, | |
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, | |
self._worker_handler, self._task_handler, | |
self._result_handler, self._cache), | |
exitpriority=15 | |
) | |
def _join_exited_workers(self): | |
"""Cleanup after any worker processes which have exited due to reaching | |
their specified lifetime. Returns True if any workers were cleaned up. | |
""" | |
cleaned = False | |
for i in reversed(range(len(self._pool))): | |
worker = self._pool[i] | |
if worker.exitcode is not None: | |
# worker exited | |
debug('cleaning up worker %d' % i) | |
worker.join() | |
cleaned = True | |
del self._pool[i] | |
return cleaned | |
def _repopulate_pool(self): | |
"""Bring the number of pool processes up to the specified number, | |
for use after reaping workers which have exited. | |
""" | |
for i in range(self._processes - len(self._pool)): | |
w = self.Process(target=worker, | |
args=(self._inqueue, self._outqueue, | |
self._initializer, | |
self._initargs, self._maxtasksperchild) | |
) | |
self._pool.append(w) | |
w.name = w.name.replace('Process', 'PoolWorker') | |
w.daemon = True | |
w.start() | |
debug('added worker') | |
def _maintain_pool(self): | |
"""Clean up any exited workers and start replacements for them. | |
""" | |
if self._join_exited_workers(): | |
self._repopulate_pool() | |
def _setup_queues(self): | |
from .queues import SimpleQueue | |
self._inqueue = SimpleQueue() | |
self._outqueue = SimpleQueue() | |
self._quick_put = self._inqueue._writer.send | |
self._quick_get = self._outqueue._reader.recv | |
def apply(self, func, args=(), kwds={}): | |
''' | |
Equivalent of `apply()` builtin | |
''' | |
assert self._state == RUN | |
return self.apply_async(func, args, kwds).get() | |
def map(self, func, iterable, chunksize=None): | |
''' | |
Equivalent of `map()` builtin | |
''' | |
assert self._state == RUN | |
return self.map_async(func, iterable, chunksize).get() | |
def imap(self, func, iterable, chunksize=1): | |
''' | |
Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` | |
''' | |
assert self._state == RUN | |
if chunksize == 1: | |
result = IMapIterator(self._cache) | |
self._taskqueue.put((((result._job, i, func, (x,), {}) | |
for i, x in enumerate(iterable)), result._set_length)) | |
return result | |
else: | |
assert chunksize > 1 | |
task_batches = Pool._get_tasks(func, iterable, chunksize) | |
result = IMapIterator(self._cache) | |
self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | |
for i, x in enumerate(task_batches)), result._set_length)) | |
return (item for chunk in result for item in chunk) | |
def imap_unordered(self, func, iterable, chunksize=1): | |
''' | |
Like `imap()` method but ordering of results is arbitrary | |
''' | |
assert self._state == RUN | |
if chunksize == 1: | |
result = IMapUnorderedIterator(self._cache) | |
self._taskqueue.put((((result._job, i, func, (x,), {}) | |
for i, x in enumerate(iterable)), result._set_length)) | |
return result | |
else: | |
assert chunksize > 1 | |
task_batches = Pool._get_tasks(func, iterable, chunksize) | |
result = IMapUnorderedIterator(self._cache) | |
self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | |
for i, x in enumerate(task_batches)), result._set_length)) | |
return (item for chunk in result for item in chunk) | |
def apply_async(self, func, args=(), kwds={}, callback=None): | |
''' | |
Asynchronous equivalent of `apply()` builtin | |
''' | |
assert self._state == RUN | |
result = ApplyResult(self._cache, callback) | |
self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) | |
return result | |
def map_async(self, func, iterable, chunksize=None, callback=None): | |
''' | |
Asynchronous equivalent of `map()` builtin | |
''' | |
assert self._state == RUN | |
if not hasattr(iterable, '__len__'): | |
iterable = list(iterable) | |
if chunksize is None: | |
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) | |
if extra: | |
chunksize += 1 | |
if len(iterable) == 0: | |
chunksize = 0 | |
task_batches = Pool._get_tasks(func, iterable, chunksize) | |
result = MapResult(self._cache, chunksize, len(iterable), callback) | |
self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | |
for i, x in enumerate(task_batches)), None)) | |
return result | |
@staticmethod | |
def _handle_workers(pool): | |
thread = threading.current_thread() | |
# Keep maintaining workers until the cache gets drained, unless the pool | |
# is terminated. | |
while thread._state == RUN or (pool._cache and thread._state != TERMINATE): | |
pool._maintain_pool() | |
time.sleep(0.1) | |
# send sentinel to stop workers | |
pool._taskqueue.put(None) | |
debug('worker handler exiting') | |
@staticmethod | |
def _handle_tasks(taskqueue, put, outqueue, pool): | |
thread = threading.current_thread() | |
for taskseq, set_length in iter(taskqueue.get, None): | |
i = -1 | |
for i, task in enumerate(taskseq): | |
if thread._state: | |
debug('task handler found thread._state != RUN') | |
break | |
try: | |
put(task) | |
except IOError: | |
debug('could not put task on queue') | |
break | |
else: | |
if set_length: | |
debug('doing set_length()') | |
set_length(i+1) | |
continue | |
break | |
else: | |
debug('task handler got sentinel') | |
try: | |
# tell result handler to finish when cache is empty | |
debug('task handler sending sentinel to result handler') | |
outqueue.put(None) | |
# tell workers there is no more work | |
debug('task handler sending sentinel to workers') | |
for p in pool: | |
put(None) | |
except IOError: | |
debug('task handler got IOError when sending sentinels') | |
debug('task handler exiting') | |
@staticmethod | |
def _handle_results(outqueue, get, cache): | |
thread = threading.current_thread() | |
while 1: | |
try: | |
task = get() | |
except (IOError, EOFError): | |
debug('result handler got EOFError/IOError -- exiting') | |
return | |
if thread._state: | |
assert thread._state == TERMINATE | |
debug('result handler found thread._state=TERMINATE') | |
break | |
if task is None: | |
debug('result handler got sentinel') | |
break | |
job, i, obj = task | |
try: | |
cache[job]._set(i, obj) | |
except KeyError: | |
pass | |
while cache and thread._state != TERMINATE: | |
try: | |
task = get() | |
except (IOError, EOFError): | |
debug('result handler got EOFError/IOError -- exiting') | |
return | |
if task is None: | |
debug('result handler ignoring extra sentinel') | |
continue | |
job, i, obj = task | |
try: | |
cache[job]._set(i, obj) | |
except KeyError: | |
pass | |
if hasattr(outqueue, '_reader'): | |
debug('ensuring that outqueue is not full') | |
# If we don't make room available in outqueue then | |
# attempts to add the sentinel (None) to outqueue may | |
# block. There is guaranteed to be no more than 2 sentinels. | |
try: | |
for i in range(10): | |
if not outqueue._reader.poll(): | |
break | |
get() | |
except (IOError, EOFError): | |
pass | |
debug('result handler exiting: len(cache)=%s, thread._state=%s', | |
len(cache), thread._state) | |
@staticmethod | |
def _get_tasks(func, it, size): | |
it = iter(it) | |
while 1: | |
x = tuple(itertools.islice(it, size)) | |
if not x: | |
return | |
yield (func, x) | |
def __reduce__(self): | |
raise NotImplementedError( | |
'pool objects cannot be passed between processes or pickled' | |
) | |
def close(self): | |
debug('closing pool') | |
if self._state == RUN: | |
self._state = CLOSE | |
self._worker_handler._state = CLOSE | |
def terminate(self): | |
debug('terminating pool') | |
self._state = TERMINATE | |
self._worker_handler._state = TERMINATE | |
self._terminate() | |
def join(self): | |
debug('joining pool') | |
assert self._state in (CLOSE, TERMINATE) | |
self._worker_handler.join() | |
self._task_handler.join() | |
self._result_handler.join() | |
for p in self._pool: | |
p.join() | |
@staticmethod | |
def _help_stuff_finish(inqueue, task_handler, size): | |
# task_handler may be blocked trying to put items on inqueue | |
debug('removing tasks from inqueue until task handler finished') | |
inqueue._rlock.acquire() | |
while task_handler.is_alive() and inqueue._reader.poll(): | |
inqueue._reader.recv() | |
time.sleep(0) | |
@classmethod | |
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, | |
worker_handler, task_handler, result_handler, cache): | |
# this is guaranteed to only be called once | |
debug('finalizing pool') | |
worker_handler._state = TERMINATE | |
task_handler._state = TERMINATE | |
debug('helping task handler/workers to finish') | |
cls._help_stuff_finish(inqueue, task_handler, len(pool)) | |
assert result_handler.is_alive() or len(cache) == 0 | |
result_handler._state = TERMINATE | |
outqueue.put(None) # sentinel | |
# We must wait for the worker handler to exit before terminating | |
# workers because we don't want workers to be restarted behind our back. | |
debug('joining worker handler') | |
worker_handler.join() | |
# Terminate workers which haven't already finished. | |
if pool and hasattr(pool[0], 'terminate'): | |
debug('terminating workers') | |
for p in pool: | |
if p.exitcode is None: | |
p.terminate() | |
debug('joining task handler') | |
task_handler.join(1e100) | |
debug('joining result handler') | |
result_handler.join(1e100) | |
if pool and hasattr(pool[0], 'terminate'): | |
debug('joining pool workers') | |
for p in pool: | |
if p.is_alive(): | |
# worker has not yet exited | |
debug('cleaning up worker %d' % p.pid) | |
p.join() | |
# | |
# Class whose instances are returned by `Pool.apply_async()` | |
# | |
class ApplyResult(object): | |
def __init__(self, cache, callback): | |
self._cond = threading.Condition(threading.Lock()) | |
self._job = job_counter.next() | |
self._cache = cache | |
self._ready = False | |
self._callback = callback | |
cache[self._job] = self | |
def ready(self): | |
return self._ready | |
def successful(self): | |
assert self._ready | |
return self._success | |
def wait(self, timeout=None): | |
self._cond.acquire() | |
try: | |
if not self._ready: | |
self._cond.wait(timeout) | |
finally: | |
self._cond.release() | |
def get(self, timeout=None): | |
self.wait(timeout) | |
if not self._ready: | |
raise TimeoutError | |
if self._success: | |
return self._value | |
else: | |
# Do something with the exception here, the simplest (but ugliest) | |
# thing to do is to simply print it to the console | |
print self._value[1] | |
raise self._value[0] | |
def _set(self, i, obj): | |
self._success, self._value = obj | |
if self._callback and self._success: | |
self._callback(self._value) | |
self._cond.acquire() | |
try: | |
self._ready = True | |
self._cond.notify() | |
finally: | |
self._cond.release() | |
del self._cache[self._job] | |
# | |
# Class whose instances are returned by `Pool.map_async()` | |
# | |
class MapResult(ApplyResult): | |
def __init__(self, cache, chunksize, length, callback): | |
ApplyResult.__init__(self, cache, callback) | |
self._success = True | |
self._value = [None] * length | |
self._chunksize = chunksize | |
if chunksize <= 0: | |
self._number_left = 0 | |
self._ready = True | |
else: | |
self._number_left = length//chunksize + bool(length % chunksize) | |
def _set(self, i, success_result): | |
success, result = success_result | |
if success: | |
self._value[i*self._chunksize:(i+1)*self._chunksize] = result | |
self._number_left -= 1 | |
if self._number_left == 0: | |
if self._callback: | |
self._callback(self._value) | |
del self._cache[self._job] | |
self._cond.acquire() | |
try: | |
self._ready = True | |
self._cond.notify() | |
finally: | |
self._cond.release() | |
else: | |
self._success = False | |
self._value = result | |
del self._cache[self._job] | |
self._cond.acquire() | |
try: | |
self._ready = True | |
self._cond.notify() | |
finally: | |
self._cond.release() | |
# | |
# Class whose instances are returned by `Pool.imap()` | |
# | |
class IMapIterator(object): | |
def __init__(self, cache): | |
self._cond = threading.Condition(threading.Lock()) | |
self._job = job_counter.next() | |
self._cache = cache | |
self._items = collections.deque() | |
self._index = 0 | |
self._length = None | |
self._unsorted = {} | |
cache[self._job] = self | |
def __iter__(self): | |
return self | |
def next(self, timeout=None): | |
self._cond.acquire() | |
try: | |
try: | |
item = self._items.popleft() | |
except IndexError: | |
if self._index == self._length: | |
raise StopIteration | |
self._cond.wait(timeout) | |
try: | |
item = self._items.popleft() | |
except IndexError: | |
if self._index == self._length: | |
raise StopIteration | |
raise TimeoutError | |
finally: | |
self._cond.release() | |
success, value = item | |
if success: | |
return value | |
raise value | |
__next__ = next # XXX | |
def _set(self, i, obj): | |
self._cond.acquire() | |
try: | |
if self._index == i: | |
self._items.append(obj) | |
self._index += 1 | |
while self._index in self._unsorted: | |
obj = self._unsorted.pop(self._index) | |
self._items.append(obj) | |
self._index += 1 | |
self._cond.notify() | |
else: | |
self._unsorted[i] = obj | |
if self._index == self._length: | |
del self._cache[self._job] | |
finally: | |
self._cond.release() | |
def _set_length(self, length): | |
self._cond.acquire() | |
try: | |
self._length = length | |
if self._index == self._length: | |
self._cond.notify() | |
del self._cache[self._job] | |
finally: | |
self._cond.release() | |
# | |
# Class whose instances are returned by `Pool.imap_unordered()` | |
# | |
class IMapUnorderedIterator(IMapIterator): | |
def _set(self, i, obj): | |
self._cond.acquire() | |
try: | |
self._items.append(obj) | |
self._index += 1 | |
self._cond.notify() | |
if self._index == self._length: | |
del self._cache[self._job] | |
finally: | |
self._cond.release() | |
# | |
# | |
# | |
class ThreadPool(Pool): | |
from .dummy import Process | |
def __init__(self, processes=None, initializer=None, initargs=()): | |
Pool.__init__(self, processes, initializer, initargs) | |
def _setup_queues(self): | |
self._inqueue = Queue.Queue() | |
self._outqueue = Queue.Queue() | |
self._quick_put = self._inqueue.put | |
self._quick_get = self._outqueue.get | |
@staticmethod | |
def _help_stuff_finish(inqueue, task_handler, size): | |
# put sentinels at head of inqueue to make workers finish | |
inqueue.not_empty.acquire() | |
try: | |
inqueue.queue.clear() | |
inqueue.queue.extend([None] * size) | |
inqueue.not_empty.notify_all() | |
finally: | |
inqueue.not_empty.release() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment