Skip to content

Instantly share code, notes, and snippets.

@multun
Created April 15, 2020 19:21
Show Gist options
  • Save multun/618e833116b37068d802440bf68f2b1e to your computer and use it in GitHub Desktop.
Save multun/618e833116b37068d802440bf68f2b1e to your computer and use it in GitHub Desktop.
import asyncio
async def merge_iter(iterables):
# create async iterators from the iterables
iterators = [iterable.__aiter__() for iterable in iterables]
# create an next() task per iterator
tasks = {asyncio.ensure_future(it.__anext__()): it for it in iterators}
try:
while tasks:
# wait for an iterator to complete
done_tasks, _pending_tasks = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
# collect the results from the completed tasks
for done_task in done_tasks:
try:
yield done_task.result()
except StopAsyncIteration:
tasks.pop(done_task)
continue
# find out what iterator generated that task
iterator = tasks.pop(done_task)
# get a new task from the iterator
tasks[asyncio.ensure_future(iterator.__anext__())] = iterator
except:
# On error, cancel all tasks
async def _cancel_tasks():
for task in tasks.keys():
task.cancel()
await asyncio.gather(*tasks.keys(), return_exceptions=True)
await asyncio.shield(_cancel_tasks())
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment