Skip to content

Instantly share code, notes, and snippets.

@noxdafox
Created April 15, 2018 17:58
Show Gist options
  • Save noxdafox/4150eff0059ea43f6adbdd66e5d5e87e to your computer and use it in GitHub Desktop.
Save noxdafox/4150eff0059ea43f6adbdd66e5d5e87e to your computer and use it in GitHub Desktop.
This code snippet shows how to wrap a concurrent.futures.Executor class to provide a limited queue size.
from threading import BoundedSemaphore
from concurrent.futures import ProcessPoolExecutor
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
pool = MaxQueuePool(ProcessPoolExecutor, 8)
f = pool.submit(print, "Hello World!")
f.result()
@EvsanDlg
Copy link

EvsanDlg commented Dec 1, 2020

@noxdafox could you explain some point, pls? I need to do some work upon the result of each future. What is the best way for doing this: to add extra callable to future.add_done_callback in the custom submit method or to use smth like as_completed() in the main pipeline? Is it possible to make it non blocking the queue filling?

@noxdafox
Copy link
Author

noxdafox commented Dec 1, 2020

Sorry but this is not a place for such discussions.

Please use stackoverflow for that. Create a question stating what you are trying to achieve, what you have been doing so far and where you are having troubles/being stuck. Please tag it as multiprocessing, I usually am pretty active there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment