Skip to content

Instantly share code, notes, and snippets.

@alkorgun
Created May 1, 2020
Embed
What would you like to do?
asyncio graceful shutdown
import abc
import asyncio
import logging
import os
import signal
__all__ = [
'AppInterface',
'start'
]
logger = logging.getLogger(__name__)
class AppInterface(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def start(self):
return NotImplementedError()
@abc.abstractmethod
def stop(self):
return NotImplementedError()
async def abort():
await asyncio.sleep(30)
logger.critical('aborted')
os._exit(0)
async def shutdown(loop: asyncio.AbstractEventLoop):
skip = loop.create_task(abort())
this = asyncio.current_task(loop)
tasks = []
for task in asyncio.all_tasks(loop):
if task is this or task is skip:
continue
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
for task in asyncio.all_tasks(loop):
task.cancel()
logger.critical('stopped')
loop.stop()
def stop(app: AppInterface, loop: asyncio.AbstractEventLoop):
app.stop()
logger.critical('service shutdown scheduled')
loop.create_task(shutdown(loop))
def fail(app: AppInterface):
def temp(loop: asyncio.AbstractEventLoop, context: dict):
try:
e = context['exception']
except KeyError:
return None
try:
raise e
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
return None
except Exception:
logger.error('unhandled exception:', exc_info=True)
stop(app, loop)
return temp
def start(app: AppInterface):
loop = asyncio.get_event_loop()
loop.create_task(app.start())
loop.add_signal_handler(signal.SIGTERM, stop, app, loop)
loop.set_exception_handler(fail(app))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment