Skip to content

Instantly share code, notes, and snippets.

Created August 31, 2017 15:15
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save kevinkreiser/6265428dbbbc8233aa8fd021bff0343a to your computer and use it in GitHub Desktop.
Save kevinkreiser/6265428dbbbc8233aa8fd021bff0343a to your computer and use it in GitHub Desktop.
Work Queue Thread Pool Example in Python
#!/usr/bin/env python
#this is mostly from:
from Queue import Queue
from threading import Thread, Event
from sys import stdout, stderr
from time import sleep
#default exception handler. if you want to take some action on failed tasks
#maybe add the task back into the queue, then make your own handler and pass it in
def default_handler(name, exception, *args, **kwargs):
print >> stderr, '%s raised %s with args %s and kwargs %s' % (name, str(exception), repr(args), repr(kwargs))
#class for workers
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, name, queue, results, abort, idle, exception_handler):
Thread.__init__(self) = name
self.queue = queue
self.results = results
self.abort = abort
self.idle = idle
self.exception_handler = exception_handler
self.daemon = True
"""Thread work loop calling the function with the params"""
def run(self):
#keep running until told to abort
while not self.abort.is_set():
#get a task and raise immediately if none available
func, args, kwargs = self.queue.get(False)
#no work to do
#if not self.idle.is_set():
# print >> stdout, '%s is idle' %
#the function may raise
result = func(*args, **kwargs)
if(result is not None):
except Exception as e:
#so we move on and handle it in whatever way the caller wanted
self.exception_handler(, e, args, kwargs)
#task complete no matter what happened
#class for thread pool
class Pool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, thread_count, batch_mode = False, exception_handler = default_handler):
#batch mode means block when adding tasks if no threads available to process
self.queue = Queue(thread_count if batch_mode else 0)
self.resultQueue = Queue(0)
self.thread_count = thread_count
self.exception_handler = exception_handler
self.aborts = []
self.idles = []
self.threads = []
"""Tell my threads to quit"""
def __del__(self):
"""Start the threads, or restart them if you've aborted"""
def run(self, block = False):
#either wait for them to finish or return false if some arent
if block:
while self.alive():
elif self.alive():
return False
#go start them
self.aborts = []
self.idles = []
self.threads = []
for n in range(self.thread_count):
abort = Event()
idle = Event()
self.threads.append(Worker('thread-%d' % n, self.queue, self.resultQueue, abort, idle, self.exception_handler))
return True
"""Add a task to the queue"""
def enqueue(self, func, *args, **kargs):
self.queue.put((func, args, kargs))
"""Wait for completion of all the tasks in the queue"""
def join(self):
"""Tell each worker that its done working"""
def abort(self, block = False):
#tell the threads to stop after they are done with what they are currently doing
for a in self.aborts:
#wait for them to finish if requested
while block and self.alive():
"""Returns True if any threads are currently running"""
def alive(self):
return True in [t.is_alive() for t in self.threads]
"""Returns True if all threads are waiting for work"""
def idle(self):
return False not in [i.is_set() for i in self.idles]
"""Returns True if not tasks are left to be completed"""
def done(self):
return self.queue.empty()
"""Get the set of results that have been processed, repeatedly call until done"""
def results(self, wait = 0):
results = []
while True:
#get a result, raises empty exception immediately if none available
return results
#run the test
if __name__ == '__main__':
def work(x):
print >> stdout, '%d finished' % x
return x
def wait_for_results(p):
while not p.done() or not p.idle():
for result in p.results():
print >> stdout, 'got result %s' % str(result)
for result in p.results():
print >> stdout, 'got result %s' % str(result)
print >> stdout, 'queueing work'
for i in range(10):
p.enqueue(work, i)
print >> stdout, 'starting work'
print >> stdout, 'waiting'
for result in p.results():
print >> stdout, 'got result %s' % str(result)
print >> stdout, 'cancel unstarted work'
print >> stdout, 'waiting and restarting'
print >> stdout, 'restarted waiting for results'
print >> stdout, 'adding more work'
for i in range(10,17):
p.enqueue(work, i)
Copy link

medram commented May 24, 2020

Thanks a lot, this is really helping :D

Copy link

Antohnio123 commented Nov 18, 2020

Fine, but there is no Queue in standard library. Pycharm advises to do "from queue import Queue".

Do I understand correct, that while we want an everlasting pool, that never stops watching for new tasks in queue, we don't need to use join() or close() to the pool?

Copy link

@Antohnio123 this was written for python2 as you can see from the print statements. You'll have to do a small port to get it python3 ready

Copy link

medram commented Dec 25, 2020

Hello everyone, I've improved this code above in my way, please check this out :D :

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment