Skip to content

Instantly share code, notes, and snippets.

@willir
Last active August 28, 2019 10:04
Show Gist options
  • Save willir/b521450b66be6e6b238c to your computer and use it in GitHub Desktop.
Save willir/b521450b66be6e6b238c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import signal
def my_handler():
print('Stopping')
for task in asyncio.Task.all_tasks():
task.cancel()
@asyncio.coroutine
def do_some(some_args):
while True:
print("Do staff with %s" % some_args)
yield from asyncio.sleep(2)
@asyncio.coroutine
def do_some2(some_args):
try:
while True:
print("Do staff with %s" % some_args)
yield from asyncio.sleep(2)
except asyncio.CancelledError:
print('Cancelling... %s' % some_args)
yield from asyncio.sleep(2)
print('Cancelled %s' % some_args)
raise
def main():
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, my_handler)
try:
loop.run_until_complete(asyncio.wait([do_some(1), do_some(2), do_some2(1), do_some2(2)]))
except asyncio.CancelledError:
print('Tasks canceled 1')
loop.run_until_complete(asyncio.wait(asyncio.Task.all_tasks()))
finally:
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment