Created
November 11, 2010 01:39
-
-
Save santa4nt/671838 to your computer and use it in GitHub Desktop.
A simple thread pool implementation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""A thread pool implementation. | |
Parts of this module are derived from: | |
http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/ | |
""" | |
import time | |
import threading | |
from Queue import Queue | |
class EmptyPool(Exception): | |
pass | |
class DrainingPool(Exception): | |
pass | |
class _ThreadPoolThread(threading.Thread): | |
"""The runner thread for a thread in the thread pool. | |
This class is coupled with a pool object that's derived from | |
AbstractThreadPool. | |
""" | |
daemon = True | |
thread_sleep_time = 0.1 | |
def __init__(self, pool): | |
# pool should be an AbstractThreadPool object | |
super(_ThreadPoolThread, self).__init__() | |
self._pool = pool | |
self._draining = False | |
def run(self): | |
while self._draining is False: | |
try: | |
task, args, callback = self._pool.dequeue_task() | |
except EmptyPool: | |
time.sleep(self.thread_sleep_time) | |
else: | |
if task is None: | |
time.sleep(self.thread_sleep_time) | |
elif callback is None: | |
task(args) | |
else: | |
callback(task(args)) | |
# signal queue that the task is done | |
self._pool.task_done(self) | |
def drain(self): | |
"""Kill this worker thread next time through. | |
""" | |
self._draining = True | |
class AbstractThreadPool(object): | |
def __init__(self, pool_size): | |
self._threads = [] | |
for i in range(pool_size): | |
t = _ThreadPoolThread(self) | |
self._threads.append(t) | |
t.start() | |
self._draining = False | |
self._lock = threading.Lock() | |
@property | |
def lock(self): | |
return self._lock | |
@property | |
def draining(self): | |
# NOT thread-safe! | |
return self._draining | |
@draining.setter | |
def draining(self, value): | |
# NOT thread-safe! | |
self._draining = True if value else False | |
def dequeue_task(self): | |
"""Retrieve the next available task tracked by this thread pool. | |
Subclass implementation of the method must return a tuple: | |
(func, args, callback) or raise an EmptyPool exception. | |
""" | |
# default implementation: report no more task; this will cause | |
# the worker thread to recheck its internal draining flag | |
raise EmptyPool() | |
def task_done(self, thread): | |
"""This is a callback from the worker thread; called when it is done | |
processing a task. | |
""" | |
pass | |
def drain(self, wait_for_threads=True): | |
"""Set the pool's draining flag to True, and optionally wait | |
for individual worker threads to drain, as well. | |
""" | |
# make sure no more tasks will be queued | |
with self.lock: | |
self.draining = True | |
if wait_for_threads: | |
for t in self._threads: | |
t.drain() | |
t.join() | |
del t | |
class ThreadPool(AbstractThreadPool): | |
def __init__(self, pool_size, queue_size=0): | |
# prepare work queue | |
self._tasks = Queue(queue_size) | |
super(ThreadPool, self).__init__(pool_size) | |
def queue_task(self, task, args=None, callback=None): | |
"""Queue a task (a callable) for the thread pool. | |
This call will block if the pool's queue size is defined and | |
its task queue is currently full. | |
""" | |
if not callable(task): | |
raise TypeError('Task: {0} is not callable!'.format(repr(task))) | |
with self.lock: | |
if self.draining: | |
raise DrainingPool() | |
self._tasks.put((task, args, callback)) | |
def dequeue_task(self): | |
"""Retrieve the next available task from the thread pool. | |
""" | |
# don't block the worker when dequeueing so it can re-check drain flag | |
if self._tasks.empty(): | |
raise EmptyPool() | |
return self._tasks.get() | |
def task_done(self, thread): | |
"""Used by a worker thread to signal that it has just completed | |
a task. | |
""" | |
self._tasks.task_done() | |
def drain(self, wait_for_tasks=True, wait_for_threads=True): | |
"""Clear all tasks in the qeueue, and optionally wait for outstanding | |
tasks and threads to finish. | |
""" | |
# make sure no more tasks will be queued | |
with self.lock: | |
self.draining = True | |
if wait_for_tasks: | |
self._tasks.join() | |
if wait_for_threads: | |
for t in self._threads: | |
t.drain() | |
t.join() | |
del t | |
class LockedIterator(object): | |
"""A wrapper class to make an iterator or a generator object thread-safe. | |
""" | |
def __init__(self, iterator): | |
self._lock = threading.Lock() | |
self._it = iterator.__iter__() | |
def __iter__(self): | |
# this wrapper is, in turn, a generator itself | |
return self | |
def next(self): | |
with self._lock: | |
return self._it.next() | |
class DrainingThreadPool(AbstractThreadPool): | |
"""A thread pool that is configured with a (thread-safe) generator/iterator | |
from whom its worker threads pull tasks from. This eliminates the need to | |
manually push tasks into the pool's internal queue. | |
For this pool, the iterator supplied must generate items as 3-tuples: | |
(func, args, callback) | |
""" | |
def __init__(self, pool_size, iterator): | |
self._it = LockedIterator(iterator) | |
self._empty = threading.Event() | |
super(DrainingThreadPool, self).__init__(pool_size) | |
def dequeue_task(self): | |
"""Retrieve the next available task from the thread pool. | |
""" | |
# don't block the worker when dequeueing so it can re-check drain flag | |
try: | |
return self._it.next() | |
except StopIteration: | |
self._empty.set() | |
raise EmptyPool() | |
def drain(self, wait_for_tasks=True, wait_for_threads=True): | |
"""Clear all tasks in the qeueue, and optionally wait for outstanding | |
tasks and threads to finish. | |
""" | |
if wait_for_tasks: | |
self._empty.wait() | |
if wait_for_threads: | |
for t in self._threads: | |
t.drain() | |
t.join() | |
del t | |
# testing the module | |
if __name__ == '__main__': | |
from random import randrange | |
def sortTask(data): | |
print "SortTask starting for ", data | |
numbers = range(data[0], data[1]) | |
for a in numbers: | |
rnd = randrange(0, len(numbers) - 1) | |
a, numbers[rnd] = numbers[rnd], a | |
print "SortTask sorting for ", data | |
numbers.sort() | |
print "SortTask done for ", data | |
return "Sorter ", data | |
def waitTask(data): | |
print "WaitTask starting for ", data | |
print "WaitTask sleeping for %d seconds" % data | |
time.sleep(data) | |
return "Waiter", data | |
def taskCallback(data): | |
print "Callback called for", data | |
print 'Testing ThreadPool...' | |
pool = ThreadPool(3) | |
pool.queue_task(sortTask, (1000, 100000), taskCallback) | |
pool.queue_task(waitTask, 5, taskCallback) | |
pool.queue_task(sortTask, (200, 200000), taskCallback) | |
pool.queue_task(waitTask, 2, taskCallback) | |
pool.queue_task(sortTask, (3, 30000), taskCallback) | |
pool.queue_task(waitTask, 7, taskCallback) | |
pool.drain() | |
print '-----------------------------' | |
print 'Testing DrainingThreadPool...' | |
tasks = [ | |
(sortTask, (1000, 100000), taskCallback), | |
(waitTask, 5, taskCallback), | |
(sortTask, (200, 200000), taskCallback), | |
(waitTask, 2, taskCallback), | |
(sortTask, (3, 30000), taskCallback), | |
(waitTask, 7, taskCallback), | |
] | |
pool = DrainingThreadPool(3, tasks) | |
pool.drain() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment