Skip to content

Instantly share code, notes, and snippets.

@andreisavu
Created June 4, 2009 12:30
Show Gist options
  • Save andreisavu/123591 to your computer and use it in GitHub Desktop.
Save andreisavu/123591 to your computer and use it in GitHub Desktop.
Python Thread Pool
# Original code at http://code.activestate.com/recipes/203871/
# I have added a limit for the number of tasks waiting
# The method queueTask will block if the current taskList size
# exceeds the specified limit
import threading
from time import sleep
# Ensure booleans exist (not needed for Python 2.2.1 or higher)
try:
True
except NameError:
False = 0
True = not False
class ThreadPool:
"""Flexible thread pool class. Creates a pool of threads, then
accepts tasks that will be dispatched to the next available
thread."""
def __init__(self, numThreads, numTasks=0):
"""Initialize the thread pool with numThreads workers. Limit
the maximum number of taks to numTasks or unlimited if 0"""
self.__threads = []
self.__resizeLock = threading.Condition(threading.Lock())
self.__taskLock = threading.Condition(threading.Lock())
self.__taskPop = threading.Event()
self.__tasks = []
self.__isJoining = False
self.__numTasks = numTasks
self.setThreadCount(numThreads)
def setThreadCount(self, newNumThreads):
""" External method to set the current pool size. Acquires
the resizing lock, then calls the internal version to do real
work."""
# Can't change the thread count if we're shutting down the pool!
if self.__isJoining:
return False
self.__resizeLock.acquire()
try:
self.__setThreadCountNolock(newNumThreads)
finally:
self.__resizeLock.release()
return True
def __setThreadCountNolock(self, newNumThreads):
"""Set the current pool size, spawning or terminating threads
if necessary. Internal use only; assumes the resizing lock is
held."""
# If we need to grow the pool, do so
while newNumThreads > len(self.__threads):
newThread = ThreadPoolThread(self)
self.__threads.append(newThread)
newThread.start()
# If we need to shrink the pool, do so
while newNumThreads < len(self.__threads):
self.__threads[0].goAway()
del self.__threads[0]
def getThreadCount(self):
"""Return the number of threads in the pool."""
self.__resizeLock.acquire()
try:
return len(self.__threads)
finally:
self.__resizeLock.release()
def queueTask(self, task, args=None, taskCallback=None):
"""Insert a task into the queue. task must be callable;
args and taskCallback can be None."""
if self.__isJoining == True:
return False
if not callable(task):
return False
self.__taskLock.acquire()
try:
while len(self.__tasks) > self.__numTasks > 0:
self.__taskLock.release()
self.__taskPop.wait()
self.__taskLock.acquire()
self.__tasks.append((task, args, taskCallback))
return True
finally:
self.__taskLock.release()
def getNextTask(self):
""" Retrieve the next task from the task queue. For use
only by ThreadPoolThread objects contained in the pool."""
self.__taskLock.acquire()
try:
if self.__tasks == []:
return (None, None, None)
else:
return self.__tasks.pop(0)
finally:
self.__taskPop.set()
self.__taskPop.clear()
self.__taskLock.release()
def joinAll(self, waitForTasks = True, waitForThreads = True):
""" Clear the task queue and terminate all pooled threads,
optionally allowing the tasks and threads to finish."""
# Mark the pool as joining to prevent any more task queueing
self.__isJoining = True
# Wait for tasks to finish
if waitForTasks:
while self.__tasks != []:
sleep(.1)
# Tell all the threads to quit
self.__resizeLock.acquire()
try:
self.__setThreadCountNolock(0)
self.__isJoining = True
# Wait until all threads have exited
if waitForThreads:
for t in self.__threads:
t.join()
del t
# Reset the pool for potential reuse
self.__isJoining = False
finally:
self.__resizeLock.release()
class ThreadPoolThread(threading.Thread):
""" Pooled thread class. """
threadSleepTime = 0.1
def __init__(self, pool):
""" Initialize the thread and remember the pool. """
threading.Thread.__init__(self)
self.__pool = pool
self.__isDying = False
def run(self):
""" Until told to quit, retrieve the next task and execute
it, calling the callback if any. """
while self.__isDying == False:
cmd, args, callback = self.__pool.getNextTask()
# If there's nothing to do, just sleep a bit
if cmd is None:
sleep(ThreadPoolThread.threadSleepTime)
elif callback is None:
cmd(args)
else:
callback(cmd(args))
def goAway(self):
""" Exit the run loop next time through."""
self.__isDying = True
# Usage example
if __name__ == "__main__":
from random import randrange
# Sample task 1: given a start and end value, shuffle integers,
# then sort them
def sortTask(data):
print "SortTask starting for ", data
numbers = range(data[0], data[1])
for a in numbers:
rnd = randrange(0, len(numbers) - 1)
a, numbers[rnd] = numbers[rnd], a
print "SortTask sorting for ", data
numbers.sort()
print "SortTask done for ", data
return "Sorter ", data
# Sample task 2: just sleep for a number of seconds.
def waitTask(data):
print "WaitTask starting for ", data
print "WaitTask sleeping for %d seconds" % data
sleep(data)
return "Waiter", data
# Both tasks use the same callback
def taskCallback(data):
print "Callback called for", data
# Create a pool with three worker threads
pool = ThreadPool(3)
# Insert tasks into the queue and let them run
pool.queueTask(sortTask, (1000, 100000), taskCallback)
pool.queueTask(waitTask, 5, taskCallback)
pool.queueTask(sortTask, (200, 200000), taskCallback)
pool.queueTask(waitTask, 2, taskCallback)
pool.queueTask(sortTask, (3, 30000), taskCallback)
pool.queueTask(waitTask, 7, taskCallback)
# When all tasks are finished, allow the threads to terminate
pool.joinAll()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment