Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@Nexuapex
Created January 8, 2015 03:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nexuapex/f7fbd6b4d67bc1c3da75 to your computer and use it in GitHub Desktop.
Save Nexuapex/f7fbd6b4d67bc1c3da75 to your computer and use it in GitHub Desktop.
Simple Python MPMC queue and thread pool.
from __future__ import absolute_import
import sys
import threading
import collections
class WorkerPool(object):
"""A thread pool that processes items from an unbounded work queue, for
simple multi-producer, multi-consumer processes.
The 'workers' argument controls the number of worker threads. The function
'func' is called on a worker thread and is passed a work item (as well as
any other parameters passed to the initializer).
The 'excepthook' property, which defaults to sys.excepthook, is called when
the worker function raises an exception. The worker thread itself is not
terminated.
Note that stop() must be called to terminate gracefully.
"""
def __init__(self, workers, func, *args, **kwargs):
self.excepthook = sys.excepthook
self.__items = collections.deque()
self.__mutex = threading.Lock()
self.__semaphore = threading.Semaphore(value=0)
self.__func = func
self.__args = args
self.__kwargs = kwargs
self.__stopped = False
self.__canceled = False
threads = []
for _ in xrange(workers):
thread = threading.Thread(target=self.__worker)
thread.start()
threads.append(thread)
self.__threads = threads
def kick(self, item):
"Puts an item into the work queue."
with self.__mutex:
if self.__stopped:
raise RuntimeError("Cannot kick new work items after stop() has been called")
self.__items.appendleft(item)
# Increase the 'pending work' counter.
self.__semaphore.release()
def stop(self, cancel=False, wait=False):
"""Stops the worker pool from processing any additional items and allows
the worker threads to terminate.
If 'cancel' is true, then the worker threads will terminate as soon as
possible, without processing every work item.
If 'wait' is true, then stop() will block until every work item has been
processed and every worker thread has terminated.
"""
with self.__mutex:
self.__stopped = True
if cancel:
self.__canceled = True
# Wake up all sleeping worker threads.
for thread in self.__threads:
self.__semaphore.release()
if wait:
for thread in self.__threads:
thread.join()
def canceled(self):
"True if the pool has been asked to cancel processing."
return self.__canceled
def __worker(self):
"Worker thread main loop."
while not self.__canceled:
# Decrease the 'pending work' counter.
self.__semaphore.acquire()
if self.__canceled:
break
with self.__mutex:
# Get a single item from the work queue. If there are no items
# left in the queue, it must be time to stop.
if self.__items:
item = self.__items.pop()
else:
assert self.__stopped
break
try:
self.__func(item, *self.__args, **self.__kwargs)
except StandardError:
self.excepthook(*sys.exc_info())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment