Skip to content

Instantly share code, notes, and snippets.

@malcolmgreaves
Last active July 1, 2023 03:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malcolmgreaves/c24058f9db16748c3380a5f59d08631f to your computer and use it in GitHub Desktop.
Save malcolmgreaves/c24058f9db16748c3380a5f59d08631f to your computer and use it in GitHub Desktop.
from concurrent.futures import ThreadPoolExecutor
from threading import BoundedSemaphore, Lock
from typing import Optional, Sequence
from concurrent_utils import BoundedPoolExecutor
__all__: Sequence[str] = (
"BoundedThreadPoolExecutor",
"IsDone",
)
class BoundedThreadPoolExecutor(ThreadPoolExecutor, BoundedPoolExecutor):
"""A work pool that uses threads to execute functions asynchronously.
Importantly, caps the maximum number of executing tasks by using a thread-safe semaphore.
This is useful when one needs to execute many async tasks, but can only have a much smaller
set of actively executing tasks at once so as to not overload some external system.
"""
def __init__(self, max_workers: Optional[int] = None) -> None:
super().__init__(max_workers)
self.semaphore = BoundedSemaphore(self._max_workers) # type: ignore
class IsDone:
"""Thread lock around a boolean that can be set to True exactly once.
Very useful to signal to a thread that it should exit. Example psuedocode:
>>>> should_exit = IsDone()
>>>> do_work: Callable[[I], O] = ...
>>>> work: Sequence[I] = ...
>>>> results: Set[Future] = ...
>>>>
>>>>
>>>> with ThreadPoolExecutor(...) as pool:
>>>>
>>>> def initate():
>>>> for item in work:
>>>> if should_exit.done():
>>>> print("exit signaled! stopping...")
>>>> return
>>>> results.add(pool.submit(do_work, item))
>>>> print("completed all work normally")
>>>>
>>>> T = threading.Thread(target=initate)
>>>> T.start()
>>>>
>>>> while True:
>>>> ....
>>>> if need_to_exit:
>>>> should_exit.done()
>>>> break
>>>> ...
>>>> T.join()
"""
def __init__(self) -> None:
"""Initializes internal boolean flag to false and creates threadsafe mutex lock."""
self._is_done = False
self._lock = Lock()
def done(self) -> None:
"""Sets internal state to true, making subsequent :func:`is_done` calls return True.
Note: operation acquires internal lock.
"""
with self._lock:
self._is_done = True
def is_done(self) -> bool:
"""Returns False until :func:`done` is called, then always returns True.
Note: operation acquires internal lock.
"""
with self._lock:
is_done = self._is_done
return is_done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment