Skip to content

Instantly share code, notes, and snippets.

@parity3
Last active December 26, 2019 05:19
Show Gist options
  • Save parity3/dac3f680beb26537b577075413980da8 to your computer and use it in GitHub Desktop.
Save parity3/dac3f680beb26537b577075413980da8 to your computer and use it in GitHub Desktop.
import itertools
import operator
from collections import deque
import trio
import time
import outcome
async def to_thread_iter_sync(fn, *args, cancellable=False, limiter=None):
"""Convert a blocking iteration into an async iteration using a thread.
In order to attenuate the overhead of spawning threads and switching
contexts, values from the blocking iteration are batched for a time one
order of magnitude greater than the spawn time of a thread.
"""
def run_batch(i_iter, start_time):
now = time.monotonic()
spawn_time = now - start_time
deadline = now + 10 * spawn_time
if i_iter is None:
i_iter = iter(fn(*args))
times = itertools.takewhile(deadline.__gt__, iter(time.monotonic, None))
items = []
values = map(outcome.Value, map(operator.itemgetter(1), zip(times, i_iter)))
try:
deque(map(items.append, values), 0)
except Exception as exc:
items.append(outcome.Error(exc))
return i_iter, items
items_iter = None
while True:
items_iter, batch = await trio.to_thread.run_sync(
run_batch,
items_iter,
time.monotonic(),
cancellable=cancellable,
limiter=limiter
)
for result in batch:
try:
yield result.unwrap()
except StopIteration:
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment