Skip to content

Instantly share code, notes, and snippets.

@kevinkreiser
Created August 31, 2017 15:15
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save kevinkreiser/6265428dbbbc8233aa8fd021bff0343a to your computer and use it in GitHub Desktop.
Save kevinkreiser/6265428dbbbc8233aa8fd021bff0343a to your computer and use it in GitHub Desktop.
Work Queue Thread Pool Example in Python
#!/usr/bin/env python
#this is mostly from:
#http://code.activestate.com/recipes/577187-python-thread-pool/
from Queue import Queue
from threading import Thread, Event
from sys import stdout, stderr
from time import sleep
#default exception handler. if you want to take some action on failed tasks
#maybe add the task back into the queue, then make your own handler and pass it in
def default_handler(name, exception, *args, **kwargs):
print >> stderr, '%s raised %s with args %s and kwargs %s' % (name, str(exception), repr(args), repr(kwargs))
pass
#class for workers
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, name, queue, results, abort, idle, exception_handler):
Thread.__init__(self)
self.name = name
self.queue = queue
self.results = results
self.abort = abort
self.idle = idle
self.exception_handler = exception_handler
self.daemon = True
self.start()
"""Thread work loop calling the function with the params"""
def run(self):
#keep running until told to abort
while not self.abort.is_set():
try:
#get a task and raise immediately if none available
func, args, kwargs = self.queue.get(False)
self.idle.clear()
except:
#no work to do
#if not self.idle.is_set():
# print >> stdout, '%s is idle' % self.name
self.idle.set()
continue
try:
#the function may raise
result = func(*args, **kwargs)
if(result is not None):
self.results.put(result)
except Exception as e:
#so we move on and handle it in whatever way the caller wanted
self.exception_handler(self.name, e, args, kwargs)
finally:
#task complete no matter what happened
self.queue.task_done()
#class for thread pool
class Pool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, thread_count, batch_mode = False, exception_handler = default_handler):
#batch mode means block when adding tasks if no threads available to process
self.queue = Queue(thread_count if batch_mode else 0)
self.resultQueue = Queue(0)
self.thread_count = thread_count
self.exception_handler = exception_handler
self.aborts = []
self.idles = []
self.threads = []
"""Tell my threads to quit"""
def __del__(self):
self.abort()
"""Start the threads, or restart them if you've aborted"""
def run(self, block = False):
#either wait for them to finish or return false if some arent
if block:
while self.alive():
sleep(1)
elif self.alive():
return False
#go start them
self.aborts = []
self.idles = []
self.threads = []
for n in range(self.thread_count):
abort = Event()
idle = Event()
self.aborts.append(abort)
self.idles.append(idle)
self.threads.append(Worker('thread-%d' % n, self.queue, self.resultQueue, abort, idle, self.exception_handler))
return True
"""Add a task to the queue"""
def enqueue(self, func, *args, **kargs):
self.queue.put((func, args, kargs))
"""Wait for completion of all the tasks in the queue"""
def join(self):
self.queue.join()
"""Tell each worker that its done working"""
def abort(self, block = False):
#tell the threads to stop after they are done with what they are currently doing
for a in self.aborts:
a.set()
#wait for them to finish if requested
while block and self.alive():
sleep(1)
"""Returns True if any threads are currently running"""
def alive(self):
return True in [t.is_alive() for t in self.threads]
"""Returns True if all threads are waiting for work"""
def idle(self):
return False not in [i.is_set() for i in self.idles]
"""Returns True if not tasks are left to be completed"""
def done(self):
return self.queue.empty()
"""Get the set of results that have been processed, repeatedly call until done"""
def results(self, wait = 0):
sleep(wait)
results = []
try:
while True:
#get a result, raises empty exception immediately if none available
results.append(self.resultQueue.get(False))
self.resultQueue.task_done()
except:
pass
return results
#run the test
if __name__ == '__main__':
def work(x):
sleep(5);
print >> stdout, '%d finished' % x
return x
def wait_for_results(p):
while not p.done() or not p.idle():
for result in p.results():
print >> stdout, 'got result %s' % str(result)
for result in p.results():
print >> stdout, 'got result %s' % str(result)
p=Pool(3)
print >> stdout, 'queueing work'
for i in range(10):
p.enqueue(work, i)
print >> stdout, 'starting work'
p.run(True)
print >> stdout, 'waiting'
sleep(8)
for result in p.results():
print >> stdout, 'got result %s' % str(result)
print >> stdout, 'cancel unstarted work'
p.abort()
print >> stdout, 'waiting and restarting'
p.run(True)
print >> stdout, 'restarted waiting for results'
wait_for_results(p)
print >> stdout, 'adding more work'
for i in range(10,17):
p.enqueue(work, i)
wait_for_results(p)
@medram
Copy link

medram commented May 24, 2020

Thanks a lot, this is really helping :D

@Antohnio123
Copy link

Antohnio123 commented Nov 18, 2020

Fine, but there is no Queue in standard library. Pycharm advises to do "from queue import Queue".

Do I understand correct, that while we want an everlasting pool, that never stops watching for new tasks in queue, we don't need to use join() or close() to the pool?

@kevinkreiser
Copy link
Author

@Antohnio123 this was written for python2 as you can see from the print statements. You'll have to do a small port to get it python3 ready

@medram
Copy link

medram commented Dec 25, 2020

Hello everyone, I've improved this code above in my way, please check this out :D :
https://github.com/medram/Pool_Workers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment