Skip to content

Instantly share code, notes, and snippets.

@mumbleskates
Last active August 30, 2016 02:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mumbleskates/534e42514ead3cf2357f1c2d7ac12cf0 to your computer and use it in GitHub Desktop.
Save mumbleskates/534e42514ead3cf2357f1c2d7ac12cf0 to your computer and use it in GitHub Desktop.
# coding=utf-8
from mumblecode.multithreading import CloseableQueue
from queue import Queue, Empty
from threading import Thread
import requests
_STOP = object()
class RequestPool(object):
def __init__(self, pool_size=20, queue_size=64, rate_limiter=lambda: None):
self.pool_size = pool_size
self.limiter = rate_limiter
self.running = 0
self.request_queue = CloseableQueue(maxsize=queue_size)
self.result_queue = Queue(maxsize=queue_size)
self.threadpool = ()
def _worker(self):
"""pool worker"""
while True:
try:
request = self.request_queue.get()
except StopIteration: # input queue has closed
# signal thread is shutting down
self.result_queue.put(_STOP)
return # shutdown thread
try:
self.limiter()
result = requests.get(request)
self.result_queue.put(result)
except Exception as ex:
self.result_queue.put((ex, request))
def start_pool(self):
"""Start the pool"""
if self.threadpool is not ():
raise ValueError("threadpool was already started")
threadpool = [
Thread(target=self._worker, daemon=True)
for _ in range(self.pool_size)
]
for thread in threadpool:
thread.start()
self.running += 1
def stop_pool(self):
"""Stop the pool and prevent any new requests from being submitted"""
self.request_queue.close()
def put_request(self, request):
"""Put a single request into the queue"""
if not self.running:
raise RuntimeError("threadpool is stopped")
self.request_queue.put(request)
def put_all(self, request_iterable):
"""
Put every request in an iterable into the queue by spawning
a new thread to insert them
"""
if not self.running:
raise RuntimeError("threadpool is stopped")
def putter():
# add an imaginary task to the request queue to signal that we
# are constantly trying to stuff more items in. This prevents
# is_idle() from encountering a race condition and returning
# a false positive when the putter thread has starved.
with self.request_queue.all_tasks_done:
self.request_queue.unfinished_tasks += 1
try:
for request in request_iterable:
self.request_queue.put(request)
except ValueError: # input queue is closed
return
finally:
# we are done stuffing items into the queue, finish that
# imaginary task
self.request_queue.task_done()
Thread(target=putter, daemon=True).start()
def get_result(self, wait=True):
"""
Return a result from the request pool or a tuple containing a
resulting error and the original request that caused it.
Raises StopIteration if the threadpool is not running.
If Wait is set to False, returns immediately or raises queue.Empty
to signify no results are ready.
"""
while self.running:
try:
result = self.result_queue.get(wait)
self.request_queue.task_done()
except Empty:
raise
if result is _STOP: # a worker thread has stopped
self.running -= 1
else:
return result
else:
raise StopIteration("threadpool is stopped")
def is_idle(self):
"""
Return True iff there are no in-flight requests.
False may be returned as the last request is just finishing. If the pool
is in use by multiple threads, True may not be accurate if another thread
just submitted a request. If the pool is only used by a single thread that
is no longer submitting requests, or the pool has been stopped, True will
always indicate that the pool is truly idle.
"""
return not self.request_queue.unfinished_tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment