Skip to content

Instantly share code, notes, and snippets.

@mrzechonek
Created May 21, 2020 21:12
Show Gist options
  • Save mrzechonek/24fa982987f2d69ac5a7940aff1d6c94 to your computer and use it in GitHub Desktop.
Save mrzechonek/24fa982987f2d69ac5a7940aff1d6c94 to your computer and use it in GitHub Desktop.
import asyncio
import itertools
async def repeat(value, *, offset, interval, count=None):
await asyncio.sleep(offset)
for i in (range(count) if count else itertools.cycle([None])):
yield value
await asyncio.sleep(interval)
async def merge(*gens):
q = asyncio.Queue()
loop = asyncio.get_event_loop()
pending = set()
done = dict()
async def iterate(gen):
async for i in gen:
await q.put((gen, i))
def finish(task, gen):
print('DONE', task)
pending.remove(task)
done[task] = gen
for gen in gens:
task = loop.create_task(iterate(gen))
task.add_done_callback(finish, context=gen)
pending.add(task)
while True:
i = await q.get()
for task, gen in done:
result = await task
yield gen, result
yield i
async def main():
events = [repeat('on', offset=0.0, interval=2.0), repeat('off', offset=1.0, interval=2.0), repeat('bzzt', offset=3.0, interval=3.0, count=2)]
async for task, event in merge(*events):
print(task, event)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment