Skip to content

Instantly share code, notes, and snippets.

@santiagobasulto
Last active June 26, 2021 13:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save santiagobasulto/10b689ba5fcadf307ffc5cd4f4ae00ec to your computer and use it in GitHub Desktop.
Save santiagobasulto/10b689ba5fcadf307ffc5cd4f4ae00ec to your computer and use it in GitHub Desktop.
# This is a proof of concept of a wait function that preserves
# the order of the futures.
#
# I'm working on a library on top of concurrent.futures
# and I needed a `wait` function that would preserve the order of the futures passed
# so I wrote a simple `wait_in_order` function.
# Using this also makes `ThreadPoolExecutor.map`'s implemenation simpler
from concurrent.futures._base import (
_create_and_install_waiters, TimeoutError,
_AcquireFutures, CANCELLED_AND_NOTIFIED, FINISHED,
ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION)
def wait_in_order(fs, timeout=None, return_when=ALL_COMPLETED):
DONE_STATUS = {CANCELLED_AND_NOTIFIED, FINISHED}
with _AcquireFutures(fs):
any_done = any([f._state in DONE_STATUS for f in fs])
any_exception = any([f.exception() is not None for f in fs if not f.cancelled() and f.done()])
all_done = all([f._state in DONE_STATUS for f in fs])
if return_when == FIRST_COMPLETED and any_done:
return fs
if return_when == FIRST_EXCEPTION and any_exception:
return fs
# return_when must be ALL_COMPLETED
if all_done:
return fs
waiter = _create_and_install_waiters(fs, return_when)
waiter.event.wait(timeout)
for f in fs:
with f._condition:
f._waiters.remove(waiter)
return fs
class NewThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
def map(self, fn, *iterables, timeout=None, chunksize=1):
fs = wait_in_order(
[self.submit(fn, *args) for args in zip(*iterables)],
timeout=timeout, return_when=ALL_COMPLETED)
if not all([f.done() for f in fs]):
raise TimeoutError()
return (f.result() for f in fs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment