Skip to content

Instantly share code, notes, and snippets.

@mrzechonek
Created December 18, 2019 22:51
Show Gist options
  • Save mrzechonek/8aadb9447bb3d4bb4919210537b11a60 to your computer and use it in GitHub Desktop.
Save mrzechonek/8aadb9447bb3d4bb4919210537b11a60 to your computer and use it in GitHub Desktop.
import asyncio
import functools
def igather(*awaitables, loop=None, timeout=None):
loop = loop or asyncio.get_event_loop()
queue = asyncio.Queue()
cancelled = False
remaining = set(map(asyncio.ensure_future, awaitables))
def done(future):
remaining.remove(future)
if not future.cancelled():
queue.put_nowait(future.exception() or future.result())
def cancel():
nonlocal cancelled
cancelled = True
queue.put_nowait(None)
if timeout is not None:
loop.call_later(timeout, cancel)
for future in remaining:
future.add_done_callback(done)
class IterGather:
async def __aiter__(self):
return self
async def __anext__(self):
if cancelled:
try:
return queue.get_nowait()
except asyncio.QueueEmpty:
raise TimeoutError from None
if not remaining:
raise StopAsyncIteration
result = await queue.get()
if cancelled:
for a in remaining:
a.cancel()
for r in await asyncio.gather(*remaining, return_exceptions=True):
queue.put_nowait(r)
result = await queue.get()
return result
return IterGather()
async def sleep(key, timeout, exception=False):
await asyncio.sleep(timeout)
if exception:
raise Exception(exception)
return key
async def main():
async for result in igather(sleep('foo', 3), sleep('bar', 2, 'dupa'), sleep('qux', 5), sleep('doo', 10), timeout=4.0):
print(type(result), result)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment