Skip to content

Instantly share code, notes, and snippets.

@earonesty
Created November 16, 2022 20:42
Show Gist options
  • Save earonesty/c51e1f7a39a4227a49b7b966d9286004 to your computer and use it in GitHub Desktop.
Save earonesty/c51e1f7a39a4227a49b7b966d9286004 to your computer and use it in GitHub Desktop.
class FutureWaitQueue:
"""Wait for futures without making an infinite list.
Any exceptions are ignored at put() time, except for timeout errors, which are raised.
The last exception raised is raised at wait() time
"""
def __init__(self, maxsize, timeout, err_on_timeout=concurrent.futures.TimeoutError):
"""Construct a future wait queue.
:param maxsize: maximum number of entries before waiting is forced
:param timeout: if a single entry takes this long, raise an exception
:param err_on_timeout: exception to raise on timeout
"""
self.maxsize = maxsize
self.timeout = timeout
self.last_ex = None
self.err_on_timeout = err_on_timeout
self.q: Set[Future] = set()
def __wait_one(self):
done, not_done = concurrent.futures.wait(
self.q, return_when=concurrent.futures.FIRST_COMPLETED, timeout=self.timeout
)
self.q = not_done
if not done:
raise self.err_on_timeout
for ent in done:
try:
ent.result()
except Exception as ex:
self.last_ex = ex
def put(self, fut: Optional[Future]):
"""Add a future to the wait queue, if the queue is full, wait for one to complete first."""
if fut:
if len(self.q) >= self.maxsize:
self.__wait_one()
self.q.add(fut)
def wait(self):
"""Wait for everything in the queue."""
for ent in self.q:
try:
ent.result(timeout=self.timeout)
except Exception as ex:
if isinstance(ex, concurrent.futures.TimeoutError):
raise self.err_on_timeout
self.last_ex = ex
if self.last_ex:
raise self.last_ex
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment