Skip to content

Instantly share code, notes, and snippets.

@malcolmgreaves
Created July 1, 2023 03:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malcolmgreaves/96d1e1726cdee6c7127e95667aa36e77 to your computer and use it in GitHub Desktop.
Save malcolmgreaves/96d1e1726cdee6c7127e95667aa36e77 to your computer and use it in GitHub Desktop.
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import BoundedSemaphore
from typing import Optional, Sequence
from concurrent_utils import BoundedPoolExecutor
__all__: Sequence[str] = ("BoundedProcessPoolExecutor",)
class BoundedProcessPoolExecutor(ProcessPoolExecutor, BoundedPoolExecutor):
"""A work pool that uses processes to execute functions asynchronously.
Importantly, caps the maximum number of executing tasks by using a process-safe semaphore.
This is useful when one needs to execute many async tasks, but can only have a much smaller
set of actively executing tasks at once so as to not overload some external system.
"""
def __init__(self, max_workers: Optional[int] = None) -> None:
super().__init__(max_workers)
self.semaphore = BoundedSemaphore(self._max_workers) # type: ignore
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment