Skip to content

Instantly share code, notes, and snippets.

@p-hennessy
Created September 28, 2015 23:14
Show Gist options
  • Save p-hennessy/8ebe87230bd722f887d6 to your computer and use it in GitHub Desktop.
Save p-hennessy/8ebe87230bd722f887d6 to your computer and use it in GitHub Desktop.
import threading
import Queue
import time
# Thread manager
class ThreadPool():
def __init__(self):
self.threads = []
self.tasks = Queue.Queue(10)
def enqueue(self, callable):
self.tasks.put(callable, block=True)
def getTask(self):
return self.tasks.get(block=True)
def addThread(self):
newThread = Worker(self, len(self.threads) + 1)
self.threads.append(newThread)
newThread.start()
def joinThreads(self):
for thread in self.threads():
thread.signalStop()
thread.join()
# Command queue consumer
class Worker(threading.Thread):
def __init__(self, pool, num):
super(Worker, self).__init__(name="WorkerThread" + str(num))
#self.daemon = True
self.name = "WorkerThread" + str(num)
self.busy = False
self.running = True
self.pool = pool
def signalStop(self):
self.running = False
def run(self):
while(self.running):
time.sleep(0.5)
print self.name + " getting task."
task = self.pool.getTask()
print self.name + " executing task."
task()
def testTask():
time.sleep(10)
if __name__ == '__main__':
threadPool = ThreadPool()
for i in range(0,5):
threadPool.addThread()
time.sleep(1)
threadPool.enqueue(testTask)
time.sleep(20)
threadPool.enqueue(testTask)
threadPool.enqueue(testTask)
threadPool.enqueue(testTask)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment