Skip to content

Instantly share code, notes, and snippets.

@santa4nt
Created November 11, 2010 01:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save santa4nt/671838 to your computer and use it in GitHub Desktop.
Save santa4nt/671838 to your computer and use it in GitHub Desktop.
A simple thread pool implementation.
# -*- coding: utf-8 -*-
"""A thread pool implementation.
Parts of this module are derived from:
http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
"""
import time
import threading
from Queue import Queue
class EmptyPool(Exception):
pass
class DrainingPool(Exception):
pass
class _ThreadPoolThread(threading.Thread):
"""The runner thread for a thread in the thread pool.
This class is coupled with a pool object that's derived from
AbstractThreadPool.
"""
daemon = True
thread_sleep_time = 0.1
def __init__(self, pool):
# pool should be an AbstractThreadPool object
super(_ThreadPoolThread, self).__init__()
self._pool = pool
self._draining = False
def run(self):
while self._draining is False:
try:
task, args, callback = self._pool.dequeue_task()
except EmptyPool:
time.sleep(self.thread_sleep_time)
else:
if task is None:
time.sleep(self.thread_sleep_time)
elif callback is None:
task(args)
else:
callback(task(args))
# signal queue that the task is done
self._pool.task_done(self)
def drain(self):
"""Kill this worker thread next time through.
"""
self._draining = True
class AbstractThreadPool(object):
def __init__(self, pool_size):
self._threads = []
for i in range(pool_size):
t = _ThreadPoolThread(self)
self._threads.append(t)
t.start()
self._draining = False
self._lock = threading.Lock()
@property
def lock(self):
return self._lock
@property
def draining(self):
# NOT thread-safe!
return self._draining
@draining.setter
def draining(self, value):
# NOT thread-safe!
self._draining = True if value else False
def dequeue_task(self):
"""Retrieve the next available task tracked by this thread pool.
Subclass implementation of the method must return a tuple:
(func, args, callback) or raise an EmptyPool exception.
"""
# default implementation: report no more task; this will cause
# the worker thread to recheck its internal draining flag
raise EmptyPool()
def task_done(self, thread):
"""This is a callback from the worker thread; called when it is done
processing a task.
"""
pass
def drain(self, wait_for_threads=True):
"""Set the pool's draining flag to True, and optionally wait
for individual worker threads to drain, as well.
"""
# make sure no more tasks will be queued
with self.lock:
self.draining = True
if wait_for_threads:
for t in self._threads:
t.drain()
t.join()
del t
class ThreadPool(AbstractThreadPool):
def __init__(self, pool_size, queue_size=0):
# prepare work queue
self._tasks = Queue(queue_size)
super(ThreadPool, self).__init__(pool_size)
def queue_task(self, task, args=None, callback=None):
"""Queue a task (a callable) for the thread pool.
This call will block if the pool's queue size is defined and
its task queue is currently full.
"""
if not callable(task):
raise TypeError('Task: {0} is not callable!'.format(repr(task)))
with self.lock:
if self.draining:
raise DrainingPool()
self._tasks.put((task, args, callback))
def dequeue_task(self):
"""Retrieve the next available task from the thread pool.
"""
# don't block the worker when dequeueing so it can re-check drain flag
if self._tasks.empty():
raise EmptyPool()
return self._tasks.get()
def task_done(self, thread):
"""Used by a worker thread to signal that it has just completed
a task.
"""
self._tasks.task_done()
def drain(self, wait_for_tasks=True, wait_for_threads=True):
"""Clear all tasks in the qeueue, and optionally wait for outstanding
tasks and threads to finish.
"""
# make sure no more tasks will be queued
with self.lock:
self.draining = True
if wait_for_tasks:
self._tasks.join()
if wait_for_threads:
for t in self._threads:
t.drain()
t.join()
del t
class LockedIterator(object):
"""A wrapper class to make an iterator or a generator object thread-safe.
"""
def __init__(self, iterator):
self._lock = threading.Lock()
self._it = iterator.__iter__()
def __iter__(self):
# this wrapper is, in turn, a generator itself
return self
def next(self):
with self._lock:
return self._it.next()
class DrainingThreadPool(AbstractThreadPool):
"""A thread pool that is configured with a (thread-safe) generator/iterator
from whom its worker threads pull tasks from. This eliminates the need to
manually push tasks into the pool's internal queue.
For this pool, the iterator supplied must generate items as 3-tuples:
(func, args, callback)
"""
def __init__(self, pool_size, iterator):
self._it = LockedIterator(iterator)
self._empty = threading.Event()
super(DrainingThreadPool, self).__init__(pool_size)
def dequeue_task(self):
"""Retrieve the next available task from the thread pool.
"""
# don't block the worker when dequeueing so it can re-check drain flag
try:
return self._it.next()
except StopIteration:
self._empty.set()
raise EmptyPool()
def drain(self, wait_for_tasks=True, wait_for_threads=True):
"""Clear all tasks in the qeueue, and optionally wait for outstanding
tasks and threads to finish.
"""
if wait_for_tasks:
self._empty.wait()
if wait_for_threads:
for t in self._threads:
t.drain()
t.join()
del t
# testing the module
if __name__ == '__main__':
from random import randrange
def sortTask(data):
print "SortTask starting for ", data
numbers = range(data[0], data[1])
for a in numbers:
rnd = randrange(0, len(numbers) - 1)
a, numbers[rnd] = numbers[rnd], a
print "SortTask sorting for ", data
numbers.sort()
print "SortTask done for ", data
return "Sorter ", data
def waitTask(data):
print "WaitTask starting for ", data
print "WaitTask sleeping for %d seconds" % data
time.sleep(data)
return "Waiter", data
def taskCallback(data):
print "Callback called for", data
print 'Testing ThreadPool...'
pool = ThreadPool(3)
pool.queue_task(sortTask, (1000, 100000), taskCallback)
pool.queue_task(waitTask, 5, taskCallback)
pool.queue_task(sortTask, (200, 200000), taskCallback)
pool.queue_task(waitTask, 2, taskCallback)
pool.queue_task(sortTask, (3, 30000), taskCallback)
pool.queue_task(waitTask, 7, taskCallback)
pool.drain()
print '-----------------------------'
print 'Testing DrainingThreadPool...'
tasks = [
(sortTask, (1000, 100000), taskCallback),
(waitTask, 5, taskCallback),
(sortTask, (200, 200000), taskCallback),
(waitTask, 2, taskCallback),
(sortTask, (3, 30000), taskCallback),
(waitTask, 7, taskCallback),
]
pool = DrainingThreadPool(3, tasks)
pool.drain()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment