Skip to content

Instantly share code, notes, and snippets.

@bshlgrs
Created September 30, 2023 17:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bshlgrs/d0d002aeabb8c1884786c986af8b7878 to your computer and use it in GitHub Desktop.
Save bshlgrs/d0d002aeabb8c1884786c986af8b7878 to your computer and use it in GitHub Desktop.
import trio
from typing import AsyncIterator, Awaitable, Callable, Coroutine, Optional, TypeVar
A = TypeVar("A")
B = TypeVar("B")
async def yield_with_delay(val, delay):
for i in range(5):
await trio.sleep(delay)
yield val
async def bind(xs: list[A], fn: Callable[[A], AsyncIterator[B]]) -> AsyncIterator[B]:
"""Monadic bind for async iterators"""
yield_thing: Optional[B] = None
yield_event = trio.Event()
yielding_lock = trio.Lock()
yielding_is_done_event = trio.Event()
num_completed = 0
async def run(x: A):
nonlocal yield_thing
async for result in fn(x):
async with yielding_lock:
yield_thing = result
yield_event.set()
await yielding_is_done_event.wait()
nonlocal num_completed
num_completed += 1
if num_completed == len(xs):
raise StopAsyncIteration
try:
async with trio.open_nursery() as nursery:
for x in xs:
nursery.start_soon(run, x)
while num_completed < len(xs):
await yield_event.wait()
yield yield_thing
yield_event = trio.Event()
yielding_is_done_event.set()
yielding_is_done_event = trio.Event()
except StopAsyncIteration:
return
async def bind_main():
async def slow_thing(x):
async for res in yield_with_delay(x, x * 0.01):
yield res
async for thing in bind([1, 2, 3, 4, 5], slow_thing):
print(thing)
trio.run(bind_main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment