Skip to content

Instantly share code, notes, and snippets.

@jgarvin
Created December 3, 2023 16:46
Show Gist options
  • Save jgarvin/5a0ae2d5ac8a97fb3170cab3d88144a5 to your computer and use it in GitHub Desktop.
Save jgarvin/5a0ae2d5ac8a97fb3170cab3d88144a5 to your computer and use it in GitHub Desktop.
asyncio won't automatically shutdown all tasks huh...
import asyncio
import signal
import logging as log
def cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
log.info("SIGINT received, cancelling tasks...")
tasks = asyncio.all_tasks(loop)
for task in asyncio.all_tasks(loop):
task.cancel()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, cancel_all_tasks, loop)
loop.add_signal_handler(signal.SIGTERM, cancel_all_tasks, loop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment