Skip to content

Instantly share code, notes, and snippets.

@antonagestam
Last active March 11, 2024 07:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save antonagestam/8476ada7d74cce93af0339cf32c62ae2 to your computer and use it in GitHub Desktop.
Save antonagestam/8476ada7d74cce93af0339cf32c62ae2 to your computer and use it in GitHub Desktop.
Merge results from multiple async generators into one single stream.
import asyncio
import random
from typing import TypeVar, AsyncGenerator
T = TypeVar("T")
async def read_into_queue(
task: AsyncGenerator[T, None],
queue: asyncio.Queue[T],
done: asyncio.Semaphore,
) -> None:
async for item in task:
await queue.put(item)
# All items from this task are in the queue, decrease semaphore by one.
await done.acquire()
async def join(*generators: AsyncGenerator[T, None]) -> AsyncGenerator[T, None]:
queue = asyncio.Queue(maxsize=1)
done_semaphore = asyncio.Semaphore(len(generators))
# Read from each given generator into the shared queue.
produce_tasks = [
asyncio.create_task(read_into_queue(task, queue, done_semaphore))
for task in generators
]
# Read items off the queue until it is empty and the semaphore value is down to zero.
while not done_semaphore.locked() or not queue.empty():
try:
yield await asyncio.wait_for(queue.get(), .001)
except TimeoutError:
continue
# Not strictly needed, but usually a good idea to await tasks, they are already finished here.
try:
await asyncio.wait_for(asyncio.gather(*produce_tasks), 0)
except TimeoutError:
raise NotImplementedError("Impossible state: expected all tasks to be exhausted")
# ---
async def produce(i) -> AsyncGenerator[int, None]:
for i in range(i, i + 5):
yield i
await asyncio.sleep(2 * random.random())
async def consume(source: AsyncGenerator[int, None]) -> None:
async for item in source:
print(f"{item=}")
@asyncio.run
@lambda fn: fn()
async def main() -> None:
await consume(
join(
produce(10),
produce(20),
produce(30),
)
)
@Lx
Copy link

Lx commented Mar 9, 2024

I’m keen to learn more about the @asyncio.run and @lambda decorators—there are no obvious Google results for these.

@antonagestam
Copy link
Author

@Lx That's just my lazy way of executing the main function, it's exactly equivalent to:

async def main():
    ...

main = main()  # @lambda fn: fn()
main = asyncio.run(main)  # @asyncio.run

Nothing magic, it's just abusing how decorators work.

@Lx
Copy link

Lx commented Mar 10, 2024

Interesting! I didn’t know that lambda syntax could be used there. Thank you!

@antonagestam
Copy link
Author

@Lx I believe it became possible with this PEP, so it hasn't been legal syntax for very long: https://peps.python.org/pep-0614/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment