Skip to content

Instantly share code, notes, and snippets.

@kleontev
Created February 23, 2022 04:56
Show Gist options
  • Save kleontev/823121ab854410c5c71d04c5bc3f6bc5 to your computer and use it in GitHub Desktop.
Save kleontev/823121ab854410c5c71d04c5bc3f6bc5 to your computer and use it in GitHub Desktop.
a version of concurrent.futures.as_completed that doesn't consume the whole iterable of futures before producing the first one.
from concurrent.futures import Future, wait, FIRST_COMPLETED
from typing import Iterable
def as_completed(fs: Iterable[Future], buffer: int = 5):
"""
Self-throttling version of concurrent.futures.as_completed.
Contrary to the standard library version, does not consume
Future iterable immediately.
"""
done, not_done = set(), set()
fs = iter(fs)
pending = True
while pending or not_done:
if pending:
for i in range(buffer - len(not_done)):
try:
not_done.add(next(fs))
except StopIteration:
pending = False
break
done, not_done = wait(
fs=not_done,
timeout=None,
return_when=FIRST_COMPLETED
)
yield from done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment