Skip to content

Instantly share code, notes, and snippets.

@davydany
Last active October 25, 2017 20:21
Show Gist options
  • Save davydany/0befd33c82c5ff9b8b50bba6a261c209 to your computer and use it in GitHub Desktop.
Save davydany/0befd33c82c5ff9b8b50bba6a261c209 to your computer and use it in GitHub Desktop.
Python Thread Pool

Python Thread Pool

Usage

import requests
from threadpool import ThreadPool

def task(url):
    response = requests.get(url)
    print(url, response)
    
max_workers = 5
tp = ThreadPool(task, max_workers)
tp.add_argument('http://www.google.com')
tp.add_argument('http://www.bing.com')
tp.add_argument('http://www.yahoo.com')
tp.add_argument('http://www.reddit.com')
tp.add_argument('http://www.cnn.com')
tp.add_argument('http://www.lifehacker.com')
tp.add_argument('http://www.redhat.com')
tp.add_argument('http://www.stackoverflow.com')
tp.add_argument('http://www.slack.com')
tp.add_argument('http://www.engadget.com')
tp.start()
import logging
import threading
import Queue
logger = logging.getLogger('threading')
class ThreadPool(object):
'''
Implements a simple ThreadPool that allows us to work pass
a simple function, and perform multi-threaded process.
'''
def __init__(self, task_function, max_workers, suffix='threadpool-thread'):
self.task = task_function
self.max_workers = max_workers
self.suffix = suffix
self.queue = Queue.Queue()
def add_argument(self, *args, **kwargs):
'''
Adds arguments to the queue, which will be set as args and kwargs
for the task that each worker will work on.
'''
self.queue.put((args, kwargs))
def start(self):
'''
Starts the ThreadPool and waits until all the workers have
completed their tasks.
'''
qsize = self.queue.qsize()
max_workers = self.max_workers if self.max_workers < qsize else qsize
workers = [Worker(self.task, self.queue) for i in xrange(max_workers)]
for i, worker in enumerate(workers):
worker_name = '%s-%d' % (self.suffix, i+1)
worker.setName(worker_name)
worker.daemon = True
worker.start()
logger.debug("Started Worker '%s'" % worker_name)
# remove any workers that have completed their task
while workers:
for worker in workers:
worker.join(0.5)
if not worker.isAlive():
workers.remove(worker)
class Worker(threading.Thread):
def __init__(self, task, queue):
super(Worker, self).__init__()
self.task = task
self.queue = queue
def run(self):
while True:
try:
args, kwargs = self.queue.get_nowait()
except Queue.Empty:
break
self.task(*args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment