Skip to content

Instantly share code, notes, and snippets.

@LostLuma
Last active March 14, 2024 23:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LostLuma/57d6275026cd7275d681f5b56fcdeb68 to your computer and use it in GitHub Desktop.
Save LostLuma/57d6275026cd7275d681f5b56fcdeb68 to your computer and use it in GitHub Desktop.
Combines multiple async generators into one continuos stream.
import asyncio
GENERATOR_CONCLUDED = object()
def wrap_silent_anext(generator):
async def silent_anext():
try:
return await generator.__anext__()
except StopAsyncIteration:
return GENERATOR_CONCLUDED
task = asyncio.create_task(silent_anext())
task.generator = generator # type: ignore
return task
async def combine_async_generators(*generators):
pending = {wrap_silent_anext(x) for x in generators}
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
if result is not GENERATOR_CONCLUDED:
yield result
pending.add(wrap_silent_anext(task.generator)) # type: ignore
async def one():
yield 1
await asyncio.sleep(1)
yield 2
async def two():
yield 3
await asyncio.sleep(2)
yield 4
async def main():
async for item in combine_async_generators(one(), two()):
print(item)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment