- Things tried and explanation of the problem/solution.
- OLD CODE TESTING SCRATCH PAD -- Python Tornado Graceful Shutdown.py
- Working Example -- Python 3.7 Tornado 5+ Example with tests.md
-
-
Save Integralist/53e926c454e34cb76445c228ded41e95 to your computer and use it in GitHub Desktop.
Working code (Python 3.7+)...
def sig_handler(server, sig, frame):
io_loop = IOLoop.current()
def stop_loop(deadline):
now = time.time()
if now < deadline and len(asyncio.all_tasks()) > 0:
logging.info('waiting for next event loop tick')
io_loop.add_timeout(now + 1, stop_loop, deadline)
else:
io_loop.stop()
logging.info('event loop stopped')
async def shutdown():
logging.info('stop listening for new connections')
server.stop()
logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds')
# await server.close_all_connections() << don't use unless you WANT to kill all client connections immediately
stop_loop(time.time() + SHUTDOWN_TIMEOUT)
io_loop.add_callback_from_signal(shutdown)
To recap what we had tried, that failed (iirc)...
-
calling
tornado.httpserver.HTTPServer#close_all_connections
:
this wasn't documented, only found via source code
it didn't actually do anything in Python 3.6 (even though its implemented? but it worked in later versions)
i.e. when not using a timeout it still didn't wait for the in-flight requests to complete -
checking timeout deadline and
io_loop._callbacks
/io_loop._timeouts
:
this would have helped close the event loop sooner
but didn't work once we migrated to latest tornado as internal properties were deprecated
couldn't find alternative properties to rely on
either way I don't think we should rely on really old tornado versions -
use asyncio's event loop with
run_until_complete
andshutdown_asyncgens
:
didn't work and I'm now not quite sure why.
at first it seemed like it was because tornado wraps the asyncio loop
and that maybe it was not exposing these methods directly
but the following calls all look to return the same event loop instance...tornado.ioloop.IOLoop.current()
tornado.ioloop.IOLoop.current().asyncio_loop
<< internally where asyncio loop is storedasyncio.get_event_loop()
-
I also tried updating to Python 3.8 so I could use
asyncio.all_tasks(loop=io_loop)
this function should return all running tasks on the event loop.
each http request is a unique event loop 'task'
so if we shutdown while there are running tasks, they should be returned
unfortunately this just yielded an empty set data structure
theio_loop
variable we pass to the constructor is a tornado event loop (which itself should be the underlying asyncio event loop) -
all_tasks
works! but only if you omit theloop
argument
this lets asyncio useget_running_loop
to acquire the event loop
#!/usr/bin/env python | |
""" | |
How to use it: | |
1. Just `kill -2 PROCESS_ID` or `kill -15 PROCESS_ID`, | |
The Tornado Web Server Will shutdown after process all the request. | |
2. When you run it behind Nginx, it can graceful reboot your production server. | |
> Note: the original version of this code didn't account for closing in-flight requests. There is a method (not documented other than sifting through the tornado source code for its HTTPServer class) that supposedly handles this, called `close_all_connections` but that doesn't seem to work when I tested it (not sure why :shrugs:). But the below version does try and account for checking internal properties `_callbacks` or `_timeouts` (which are only available for older versions of tornado, so the code doesn't work for version 5+). | |
See alternative approach (uses tornado.gen.sleep instead of ioloop ticks): | |
https://github.com/tornadoweb/tornado/issues/1791#issuecomment-409258371 | |
async def shutdown(): | |
logging.info('Stopping http server') | |
server.stop() | |
await server.close_all_connections() | |
logging.info('Will shutdown in %s seconds ...', SHUTDOWN_TIMEOUT) | |
await tornado.gen.sleep(SHUTDOWN_TIMEOUT) | |
ioloop.IOLoop.current().stop() | |
> Note: a lot of these examples are problematic or don't quite work due to the incompatible nature of tornado's IOLoop vs asyncio's. In Tornado version 5.0+ their IOLoop became a wrapper around asyncio's but even then they only expose a small number of methods that proxy to the underlying IOLoop, so methods like `run_until_complete` and `shutdown_asyncgens` (see below example) just don't work. | |
def shutdown(): | |
logging.info('Stopping http server') | |
server.stop() | |
io_loop.run_until_complete(io_loop.shutdown_asyncgens()) | |
io_loop.close() | |
The following are all the same event loop instance... | |
- tornado.ioloop.IOLoop.current() | |
- tornado.ioloop.IOLoop.current().asyncio_loop | |
- asyncio.get_event_loop() | |
## UPDATE | |
The best I've been able to achieve with latest tornado (6.0.3) is... | |
def sig_handler(server, sig, frame): | |
io_loop = IOLoop.current() | |
async def shutdown(): | |
logging.info('stop listening for new connections') | |
server.stop() | |
logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds') | |
await tornado.gen.sleep(SHUTDOWN_TIMEOUT) | |
io_loop.stop() | |
logging.info('event loop stopped') | |
io_loop.add_callback_from_signal(shutdown) | |
## UPDATE 2 -- BETTER SOLUTION | |
def sig_handler(server, sig, frame): | |
io_loop = IOLoop.current() | |
def stop_loop(deadline): | |
now = time.time() | |
logging.info(f'len(asyncio.all_tasks()): {len(asyncio.all_tasks())}') | |
if now < deadline and len(asyncio.all_tasks()) > 0: | |
logging.info('waiting for next event loop tick') | |
io_loop.add_timeout(now + 1, stop_loop, deadline) | |
else: | |
io_loop.stop() | |
logging.info('event loop stopped') | |
async def shutdown(): | |
logging.info('stop listening for new connections') | |
server.stop() | |
logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds') | |
# await server.close_all_connections() << don't use unless you WANT to kill all client connections immediately | |
stop_loop(time.time() + SHUTDOWN_TIMEOUT) | |
io_loop.add_callback_from_signal(shutdown) | |
""" | |
import time | |
import signal | |
import logging | |
from functools import partial | |
import tornado.httpserver | |
import tornado.ioloop | |
import tornado.options | |
import tornado.web | |
from tornado.options import define, options | |
define("port", default=8888, help="run on the given port", type=int) | |
MAX_WAIT_SECONDS_BEFORE_SHUTDOWN = 3 | |
class MainHandler(tornado.web.RequestHandler): | |
def get(self): | |
self.write("Hello, world") | |
def sig_handler(server, sig, frame): | |
io_loop = tornado.ioloop.IOLoop.instance() | |
def stop_loop(deadline): | |
now = time.time() | |
if now < deadline and (io_loop._callbacks or io_loop._timeouts): | |
logging.info('Waiting for next tick') | |
io_loop.add_timeout(now + 1, stop_loop, deadline) | |
else: | |
io_loop.stop() | |
logging.info('Shutdown finally') | |
async def shutdown(): | |
logging.info('Stopping http server') | |
server.stop() | |
logging.info('Will shutdown in %s seconds ...', | |
MAX_WAIT_SECONDS_BEFORE_SHUTDOWN) | |
await server.close_all_connections() | |
stop_loop(time.time() + MAX_WAIT_SECONDS_BEFORE_SHUTDOWN) | |
logging.warning('Caught signal: %s', sig) | |
io_loop.add_callback_from_signal(shutdown) | |
def main(): | |
tornado.options.parse_command_line() | |
application = tornado.web.Application([ | |
(r"/", MainHandler), | |
]) | |
server = tornado.httpserver.HTTPServer(application) | |
server.listen(options.port) | |
signal.signal(signal.SIGTERM, partial(sig_handler, server)) | |
signal.signal(signal.SIGINT, partial(sig_handler, server)) | |
tornado.ioloop.IOLoop.instance().start() | |
logging.info("Exit...") | |
if __name__ == "__main__": | |
main() |
bf_tornado.signals
module
"""signals module provides helper functions for tornado graceful shutdown."""
import asyncio
import functools
import signal
import time
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
SHUTDOWN_TIMEOUT = 30
async def shutdown(server: HTTPServer, timeout: int, io_loop: IOLoop):
"""Stop HTTPServer and schedule stopping of ioloop."""
# stop listening for new connections
server.stop()
# schedule ioloop shutdown
deadline = time.time() + timeout
graceful_shutdown(deadline, io_loop)
def graceful_shutdown(deadline: float, io_loop: IOLoop):
"""Gracefully shutdown ioloop by allowing tasks to complete by deadline."""
now = time.time()
tasks = asyncio.all_tasks()
if now < deadline and len(tasks) > 0:
# defer shutdown until all tasks have a chance to complete
io_loop.add_timeout(now + 1, graceful_shutdown, deadline, io_loop)
else:
stop_loop(io_loop)
def stop_loop(io_loop: IOLoop):
"""Stop the ioloop.
This oneline function isn't inlined as it allows for simpler mocking within
our bf_tornado test suite.
"""
io_loop.stop()
def sig_handler(server: HTTPServer, timeout: int, sig, frame):
"""Schedules ioloop shutdown after specified timeout when TERM/INT signals received.
In-flights tasks running on the asyncio event loop will be given the
opportunity to finish before the loop is shutdown after specified timeout.
Expects to be initiated using partial application:
functools.partial(sig_handler, HTTPServer())
This partial application is typically handled by signals.sig_listener.
"""
io_loop = IOLoop.current()
# execute callback on next event loop tick
io_loop.add_callback_from_signal(shutdown, server, timeout, io_loop)
def sig_listener(server: HTTPServer, timeout: int = 0):
"""Configures listeners for TERM/INT signals.
Timeout should be a positive integer, otherwise a default will be provided.
"""
if not timeout:
timeout = SHUTDOWN_TIMEOUT
p = functools.partial(sig_handler, server, timeout)
signal.signal(signal.SIGTERM, p)
signal.signal(signal.SIGINT, p)
import asyncio
import os
import signal
import threading
import time
from unittest import mock
import bf_metrics
import bf_tornado.handlers
import bf_tornado.signals
import tornado.gen
import tornado.ioloop
import tornado.testing
import tornado.util
import tornado.web
class TestGracefulShutdown(tornado.testing.AsyncHTTPTestCase):
"""Verify server allows in-flight requests to complete before shutdown.
Note: we're using tornado's `yield` abstraction instead of proper
async/await syntax because of the use of `gen_test` decorator to work
around fact the ioloop isn't running when running the test suite.
This also means we need to use the `get_url` abstraction function to
convert the requested path into a fully qualified path to the locally
running web server.
"""
def get_app(self):
class FooHandler(bf_tornado.handlers.BaseHandler):
metrics = bf_metrics.Metrics(namespace='foo', host='localhost')
async def get(self):
await asyncio.sleep(3)
self.finish('OK')
return tornado.web.Application([
(r'/', FooHandler)
])
def tearDown(self):
"""Ensure we verify the `stop_loop` mock was called."""
try:
self.mock_stop_loop.assert_called()
except AssertionError:
self.fail("mock_stop_loop assertion failed")
@mock.patch("bf_tornado.signals.stop_loop")
@tornado.testing.gen_test
def test_inflight_completed(self, mock_stop_loop):
"""verify in-flight request completes after SIGINT is received.
The test flow is:
- make network request
- issue interrupt signal (via separate thread)
- network request completes before ioloop shutdown timeout
We mock the `stop_loop` function which is what would normally trigger
the ioloop to be stopped. So although we issue a SIGINT it won't
actually stop the ioloop that is running this test.
The request flow means the web server request will complete BEFORE the
mocked `stop_loop` function has been called. This is a problem because
the test function will finish executing and thus the assertion check at
that time is likely to fail.
To solve this problem, we sleep to enable enough time for the
`stop_loop` function to be called, then we assign the mock to the class
so that we can reference it within the tearDown function. That gives
our asynchronous task time to be called.
"""
# ensure ioloop waits long enough for request to complete
shutdown_timeout = 10
bf_tornado.signals.sig_listener(self.http_server, shutdown_timeout)
pid = os.getpid()
def trigger_signal():
# defer SIGNINT long enough to allow HTTP request to tornado server
time.sleep(1)
os.kill(pid, signal.SIGINT)
thread = threading.Thread(target=trigger_signal)
thread.daemon = True
thread.start()
resp = yield self.http_client.fetch(self.get_url('/'))
assert resp.code == 200
# this sleep causes tornado's ioloop to context switch back to active
# when this test function finishes (and before the tearDown) is
# triggered, thus allowing the async task to call the stop_loop mock.
time.sleep(1)
# we can't assert the mock was called from this test function as the
# test function itself is blocking the ioloop background task.
#
# the reason it blocks is because the http request is designed to
# finish before the shutdown timeout and so once we're back inside the
# test function we cannot context switch back to the asyncio task that
# is trying to verify if it should call `stop_loop`.
#
# this means we must assert against the mock within a tearDown function.
self.mock_stop_loop = mock_stop_loop
@mock.patch("bf_tornado.signals.stop_loop")
@tornado.testing.gen_test
def test_inflight_dropped(self, mock_stop_loop):
"""verify in-flight request is dropped after SIGINT is received.
The test flow is:
- make network request
- issue interrupt signal (via separate thread)
- ioloop shuts down before network request completes
We mock the `stop_loop` function which is what would normally trigger
the ioloop to be stopped. So although we issue a SIGINT it won't
actually stop the ioloop that is running this test.
The request flow means the web server request will NOT complete before
the mocked `stop_loop` function has been called.
"""
# reset mock property
self.mock_stop_loop = mock_stop_loop
# ensure ioloop does NOT wait long enough for request to complete
shutdown_timeout = 1
bf_tornado.signals.sig_listener(self.http_server, shutdown_timeout)
pid = os.getpid()
def trigger_signal():
# defer SIGNINT long enough to allow HTTP request to tornado server
time.sleep(1)
os.kill(pid, signal.SIGINT)
thread = threading.Thread(target=trigger_signal)
thread.daemon = True
thread.start()
resp = yield self.http_client.fetch(self.get_url('/'))
assert resp.code == 200
# we can assert the mock was called here because the background asyncio
# task has completed (i.e. called `stop_loop`) by the time the http
# request has completed.
mock_stop_loop.assert_called()