-
-
Save parity3/dac3f680beb26537b577075413980da8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import operator | |
from collections import deque | |
import trio | |
import time | |
import outcome | |
async def to_thread_iter_sync(fn, *args, cancellable=False, limiter=None): | |
"""Convert a blocking iteration into an async iteration using a thread. | |
In order to attenuate the overhead of spawning threads and switching | |
contexts, values from the blocking iteration are batched for a time one | |
order of magnitude greater than the spawn time of a thread. | |
""" | |
def run_batch(i_iter, start_time): | |
now = time.monotonic() | |
spawn_time = now - start_time | |
deadline = now + 10 * spawn_time | |
if i_iter is None: | |
i_iter = iter(fn(*args)) | |
times = itertools.takewhile(deadline.__gt__, iter(time.monotonic, None)) | |
items = [] | |
values = map(outcome.Value, map(operator.itemgetter(1), zip(times, i_iter))) | |
try: | |
deque(map(items.append, values), 0) | |
except Exception as exc: | |
items.append(outcome.Error(exc)) | |
return i_iter, items | |
items_iter = None | |
while True: | |
items_iter, batch = await trio.to_thread.run_sync( | |
run_batch, | |
items_iter, | |
time.monotonic(), | |
cancellable=cancellable, | |
limiter=limiter | |
) | |
for result in batch: | |
try: | |
yield result.unwrap() | |
except StopIteration: | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment