Skip to content

Instantly share code, notes, and snippets.

@rsmoorthy
Last active October 7, 2018 08:49
Show Gist options
  • Save rsmoorthy/39a6f789daeebd870cb2bb2f442647d6 to your computer and use it in GitHub Desktop.
Save rsmoorthy/39a6f789daeebd870cb2bb2f442647d6 to your computer and use it in GitHub Desktop.
Async Timer Example
import asyncio
class AsyncTimer:
def __init__(self, timeout, func, repeat=False, loop=None):
self.loop = loop
self._timeout = timeout
self._func = func
self._repeat = repeat
if loop:
self._task = loop.create_task(self._job())
else:
self._task = asyncio.ensure_future(self._job())
async def _job(self):
while True:
await asyncio.sleep(self._timeout)
await self._func()
if self._repeat == False:
break
def cancel(self):
self._task.cancel()
self._repeat = False
async def mytask(p):
print("task %s" % p)
await asyncio.sleep(3*int(p))
return int(p) * 5
async def main(loop):
tsks = []
tsks.append(loop.create_task(mytask(1)))
timer2 = AsyncTimer(loop=None, timeout=2, func=lambda: mytask(2), repeat=False)
tsks.append(timer2._task)
# With repeat=True - the main loop will never exit
timer3 = AsyncTimer(timeout=3, func=lambda: mytask(3), repeat=True)
tsks.append(timer3._task)
ret = await asyncio.gather(*tsks)
print("All tasks completed %s" % str(ret))
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment