Skip to content

Instantly share code, notes, and snippets.

@frankcleary
Last active June 28, 2023 10:56
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save frankcleary/f97fe244ef54cd75278e521ea52a697a to your computer and use it in GitHub Desktop.
Save frankcleary/f97fe244ef54cd75278e521ea52a697a to your computer and use it in GitHub Desktop.
Python ThreadPoolExecutor with bounded queue
from concurrent.futures import ThreadPoolExecutor
from threading import BoundedSemaphore
class BoundedExecutor:
"""BoundedExecutor behaves as a ThreadPoolExecutor which will block on
calls to submit() once the limit given as "bound" work items are queued for
execution.
:param bound: Integer - the maximum number of items in the work queue
:param max_workers: Integer - the size of the thread pool
"""
def __init__(self, bound, max_workers):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.semaphore = BoundedSemaphore(bound + max_workers)
"""See concurrent.futures.Executor#submit"""
def submit(self, fn, *args, **kwargs):
self.semaphore.acquire()
try:
future = self.executor.submit(fn, *args, **kwargs)
except:
self.semaphore.release()
raise
else:
future.add_done_callback(lambda x: self.semaphore.release())
return future
"""See concurrent.futures.Executor#shutdown"""
def shutdown(self, wait=True):
self.executor.shutdown(wait)
import time
from boundedexecutor import BoundedExecutor
def work():
time.sleep(1)
print('work done')
if __name__ == '__main__':
executor = BoundedExecutor(10, 200)
for i in range(1000):
executor.submit(work)
print('work submitted')
@4l1fe
Copy link

4l1fe commented Sep 13, 2019

Thx. It's helpful. I have a need of such functionality because i can overflow a job queue is inside a pool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment