Last active
July 1, 2023 03:00
-
-
Save malcolmgreaves/c24058f9db16748c3380a5f59d08631f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ThreadPoolExecutor | |
from threading import BoundedSemaphore, Lock | |
from typing import Optional, Sequence | |
from concurrent_utils import BoundedPoolExecutor | |
__all__: Sequence[str] = ( | |
"BoundedThreadPoolExecutor", | |
"IsDone", | |
) | |
class BoundedThreadPoolExecutor(ThreadPoolExecutor, BoundedPoolExecutor): | |
"""A work pool that uses threads to execute functions asynchronously. | |
Importantly, caps the maximum number of executing tasks by using a thread-safe semaphore. | |
This is useful when one needs to execute many async tasks, but can only have a much smaller | |
set of actively executing tasks at once so as to not overload some external system. | |
""" | |
def __init__(self, max_workers: Optional[int] = None) -> None: | |
super().__init__(max_workers) | |
self.semaphore = BoundedSemaphore(self._max_workers) # type: ignore | |
class IsDone: | |
"""Thread lock around a boolean that can be set to True exactly once. | |
Very useful to signal to a thread that it should exit. Example psuedocode: | |
>>>> should_exit = IsDone() | |
>>>> do_work: Callable[[I], O] = ... | |
>>>> work: Sequence[I] = ... | |
>>>> results: Set[Future] = ... | |
>>>> | |
>>>> | |
>>>> with ThreadPoolExecutor(...) as pool: | |
>>>> | |
>>>> def initate(): | |
>>>> for item in work: | |
>>>> if should_exit.done(): | |
>>>> print("exit signaled! stopping...") | |
>>>> return | |
>>>> results.add(pool.submit(do_work, item)) | |
>>>> print("completed all work normally") | |
>>>> | |
>>>> T = threading.Thread(target=initate) | |
>>>> T.start() | |
>>>> | |
>>>> while True: | |
>>>> .... | |
>>>> if need_to_exit: | |
>>>> should_exit.done() | |
>>>> break | |
>>>> ... | |
>>>> T.join() | |
""" | |
def __init__(self) -> None: | |
"""Initializes internal boolean flag to false and creates threadsafe mutex lock.""" | |
self._is_done = False | |
self._lock = Lock() | |
def done(self) -> None: | |
"""Sets internal state to true, making subsequent :func:`is_done` calls return True. | |
Note: operation acquires internal lock. | |
""" | |
with self._lock: | |
self._is_done = True | |
def is_done(self) -> bool: | |
"""Returns False until :func:`done` is called, then always returns True. | |
Note: operation acquires internal lock. | |
""" | |
with self._lock: | |
is_done = self._is_done | |
return is_done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment