Skip to content

Instantly share code, notes, and snippets.

@mumbleskates
Last active August 30, 2016 02:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mumbleskates/edff5bed3b6d374ed0959c09a1379fb9 to your computer and use it in GitHub Desktop.
Save mumbleskates/edff5bed3b6d374ed0959c09a1379fb9 to your computer and use it in GitHub Desktop.
# coding=utf-8
from mumblecode.multithreading import CloseableQueue
from threading import Thread
from concurrent.futures import Future
import requests
class RequestFuturePool(object):
def __init__(self, pool_size=20, queue_size=64, rate_limiter=lambda: None):
self.pool_size = pool_size
self.limiter = rate_limiter
self.running = 0
self.request_queue = CloseableQueue(maxsize=queue_size)
self.threadpool = ()
def _worker(self):
"""pool worker"""
while True:
try:
request, future = self.request_queue.get()
except StopIteration: # input queue has closed
return # shutdown thread
if future.set_running_or_notify_cancel():
try:
self.limiter()
future.set_result(requests.get(request))
except Exception as ex:
future.set_exception(ex)
else:
pass # request was canceled
def start(self):
"""Start the pool"""
if self.threadpool is not ():
raise ValueError("threadpool was already started")
threadpool = [
Thread(target=self._worker, daemon=True)
for _ in range(self.pool_size)
]
for thread in threadpool:
thread.start()
self.running += 1
def stop(self):
"""Stop the pool and prevent any new requests from being submitted"""
self.request_queue.close()
def put_request(self, request):
"""
Put a single request into the queue and return a Future object
representing its result
"""
if not self.running:
raise RuntimeError("threadpool is stopped")
future = Future()
self.request_queue.put((request, future))
return future
def join(self):
"""Block until all worker threads have finished"""
for thread in self.threadpool:
thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment