Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malcolmgreaves/e28858ce07212a6a1ef841f109ef3900 to your computer and use it in GitHub Desktop.
Save malcolmgreaves/e28858ce07212a6a1ef841f109ef3900 to your computer and use it in GitHub Desktop.
import threading
from abc import ABC
from concurrent.futures import Executor, Future
from multiprocessing import synchronize
from typing import Optional, Sequence, Union
__all__: Sequence[str] = ("BoundedPoolExecutor",)
class BoundedPoolExecutor(ABC, Executor):
"""A workpool that limits the number of active work items using an internal semaphore.
Note: implementations must also implement concurrent.futures.Executor !
"""
semaphore: Optional[Union[threading.Lock, synchronize.SemLock]] = None
def _acquire(self) -> None:
"""Acquires a lock on the internal semaphore."""
assert self.semaphore is not None
self.semaphore.acquire()
def _release(self, _=None) -> None:
"""Releases the internal semaphore. Ignores input: necessary to accept one input for future callback."""
assert self.semaphore is not None
self.semaphore.release()
def submit(self, fn, *args, **kwargs) -> Future:
"""Submits the function & its input for Asynchronous execution.
Will wait to run until there is a free worker: this `submit` will only work on
"""
self._acquire()
try:
# from concurrent.futures.Executor
future: Future = super().submit(fn, *args, **kwargs) # type: ignore
except Exception: # noqa
self._release()
raise
else:
future.add_done_callback(self._release)
return future
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment