Skip to content

Instantly share code, notes, and snippets.

@Integralist
Last active December 13, 2019 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Integralist/53e926c454e34cb76445c228ded41e95 to your computer and use it in GitHub Desktop.
Save Integralist/53e926c454e34cb76445c228ded41e95 to your computer and use it in GitHub Desktop.
[Python Tornado Graceful Shutdown] #python #python3 #tornado #graceful #shutdown #signals #sigterm #sigint
  • Things tried and explanation of the problem/solution.
  • OLD CODE TESTING SCRATCH PAD -- Python Tornado Graceful Shutdown.py
  • Working Example -- Python 3.7 Tornado 5+ Example with tests.md

Working code (Python 3.7+)...

def sig_handler(server, sig, frame):
    io_loop = IOLoop.current()
    
    def stop_loop(deadline):
        now = time.time()
        if now < deadline and len(asyncio.all_tasks()) > 0:
            logging.info('waiting for next event loop tick')
            io_loop.add_timeout(now + 1, stop_loop, deadline)
        else:
            io_loop.stop()
            logging.info('event loop stopped')
            
    async def shutdown():
        logging.info('stop listening for new connections')
        server.stop()
        logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds')
        # await server.close_all_connections() << don't use unless you WANT to kill all client connections immediately
        stop_loop(time.time() + SHUTDOWN_TIMEOUT)
        
    io_loop.add_callback_from_signal(shutdown)

To recap what we had tried, that failed (iirc)...

  • calling tornado.httpserver.HTTPServer#close_all_connections:
    this wasn't documented, only found via source code
    it didn't actually do anything in Python 3.6 (even though its implemented? but it worked in later versions)
    i.e. when not using a timeout it still didn't wait for the in-flight requests to complete

  • checking timeout deadline and io_loop._callbacks/io_loop._timeouts:
    this would have helped close the event loop sooner
    but didn't work once we migrated to latest tornado as internal properties were deprecated
    couldn't find alternative properties to rely on
    either way I don't think we should rely on really old tornado versions

  • use asyncio's event loop with run_until_complete and shutdown_asyncgens:
    didn't work and I'm now not quite sure why.
    at first it seemed like it was because tornado wraps the asyncio loop
    and that maybe it was not exposing these methods directly
    but the following calls all look to return the same event loop instance...

    • tornado.ioloop.IOLoop.current()
    • tornado.ioloop.IOLoop.current().asyncio_loop << internally where asyncio loop is stored
    • asyncio.get_event_loop()
  • I also tried updating to Python 3.8 so I could use asyncio.all_tasks(loop=io_loop)
    this function should return all running tasks on the event loop.
    each http request is a unique event loop 'task'
    so if we shutdown while there are running tasks, they should be returned
    unfortunately this just yielded an empty set data structure
    the io_loop variable we pass to the constructor is a tornado event loop (which itself should be the underlying asyncio event loop)

  • all_tasks works! but only if you omit the loop argument
    this lets asyncio use get_running_loop to acquire the event loop

#!/usr/bin/env python
"""
How to use it:
1. Just `kill -2 PROCESS_ID` or `kill -15 PROCESS_ID`,
The Tornado Web Server Will shutdown after process all the request.
2. When you run it behind Nginx, it can graceful reboot your production server.
> Note: the original version of this code didn't account for closing in-flight requests. There is a method (not documented other than sifting through the tornado source code for its HTTPServer class) that supposedly handles this, called `close_all_connections` but that doesn't seem to work when I tested it (not sure why :shrugs:). But the below version does try and account for checking internal properties `_callbacks` or `_timeouts` (which are only available for older versions of tornado, so the code doesn't work for version 5+).
See alternative approach (uses tornado.gen.sleep instead of ioloop ticks):
https://github.com/tornadoweb/tornado/issues/1791#issuecomment-409258371
async def shutdown():
logging.info('Stopping http server')
server.stop()
await server.close_all_connections()
logging.info('Will shutdown in %s seconds ...', SHUTDOWN_TIMEOUT)
await tornado.gen.sleep(SHUTDOWN_TIMEOUT)
ioloop.IOLoop.current().stop()
> Note: a lot of these examples are problematic or don't quite work due to the incompatible nature of tornado's IOLoop vs asyncio's. In Tornado version 5.0+ their IOLoop became a wrapper around asyncio's but even then they only expose a small number of methods that proxy to the underlying IOLoop, so methods like `run_until_complete` and `shutdown_asyncgens` (see below example) just don't work.
def shutdown():
logging.info('Stopping http server')
server.stop()
io_loop.run_until_complete(io_loop.shutdown_asyncgens())
io_loop.close()
The following are all the same event loop instance...
- tornado.ioloop.IOLoop.current()
- tornado.ioloop.IOLoop.current().asyncio_loop
- asyncio.get_event_loop()
## UPDATE
The best I've been able to achieve with latest tornado (6.0.3) is...
def sig_handler(server, sig, frame):
io_loop = IOLoop.current()
async def shutdown():
logging.info('stop listening for new connections')
server.stop()
logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds')
await tornado.gen.sleep(SHUTDOWN_TIMEOUT)
io_loop.stop()
logging.info('event loop stopped')
io_loop.add_callback_from_signal(shutdown)
## UPDATE 2 -- BETTER SOLUTION
def sig_handler(server, sig, frame):
io_loop = IOLoop.current()
def stop_loop(deadline):
now = time.time()
logging.info(f'len(asyncio.all_tasks()): {len(asyncio.all_tasks())}')
if now < deadline and len(asyncio.all_tasks()) > 0:
logging.info('waiting for next event loop tick')
io_loop.add_timeout(now + 1, stop_loop, deadline)
else:
io_loop.stop()
logging.info('event loop stopped')
async def shutdown():
logging.info('stop listening for new connections')
server.stop()
logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds')
# await server.close_all_connections() << don't use unless you WANT to kill all client connections immediately
stop_loop(time.time() + SHUTDOWN_TIMEOUT)
io_loop.add_callback_from_signal(shutdown)
"""
import time
import signal
import logging
from functools import partial
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
MAX_WAIT_SECONDS_BEFORE_SHUTDOWN = 3
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
def sig_handler(server, sig, frame):
io_loop = tornado.ioloop.IOLoop.instance()
def stop_loop(deadline):
now = time.time()
if now < deadline and (io_loop._callbacks or io_loop._timeouts):
logging.info('Waiting for next tick')
io_loop.add_timeout(now + 1, stop_loop, deadline)
else:
io_loop.stop()
logging.info('Shutdown finally')
async def shutdown():
logging.info('Stopping http server')
server.stop()
logging.info('Will shutdown in %s seconds ...',
MAX_WAIT_SECONDS_BEFORE_SHUTDOWN)
await server.close_all_connections()
stop_loop(time.time() + MAX_WAIT_SECONDS_BEFORE_SHUTDOWN)
logging.warning('Caught signal: %s', sig)
io_loop.add_callback_from_signal(shutdown)
def main():
tornado.options.parse_command_line()
application = tornado.web.Application([
(r"/", MainHandler),
])
server = tornado.httpserver.HTTPServer(application)
server.listen(options.port)
signal.signal(signal.SIGTERM, partial(sig_handler, server))
signal.signal(signal.SIGINT, partial(sig_handler, server))
tornado.ioloop.IOLoop.instance().start()
logging.info("Exit...")
if __name__ == "__main__":
main()

Code

bf_tornado.signals module

"""signals module provides helper functions for tornado graceful shutdown."""

import asyncio
import functools
import signal
import time

from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

SHUTDOWN_TIMEOUT = 30


async def shutdown(server: HTTPServer, timeout: int, io_loop: IOLoop):
    """Stop HTTPServer and schedule stopping of ioloop."""

    # stop listening for new connections
    server.stop()

    # schedule ioloop shutdown
    deadline = time.time() + timeout
    graceful_shutdown(deadline, io_loop)


def graceful_shutdown(deadline: float, io_loop: IOLoop):
    """Gracefully shutdown ioloop by allowing tasks to complete by deadline."""

    now = time.time()
    tasks = asyncio.all_tasks()

    if now < deadline and len(tasks) > 0:
        # defer shutdown until all tasks have a chance to complete
        io_loop.add_timeout(now + 1, graceful_shutdown, deadline, io_loop)
    else:
        stop_loop(io_loop)


def stop_loop(io_loop: IOLoop):
    """Stop the ioloop.

    This oneline function isn't inlined as it allows for simpler mocking within
    our bf_tornado test suite.
    """

    io_loop.stop()


def sig_handler(server: HTTPServer, timeout: int, sig, frame):
    """Schedules ioloop shutdown after specified timeout when TERM/INT signals received.

    In-flights tasks running on the asyncio event loop will be given the
    opportunity to finish before the loop is shutdown after specified timeout.

    Expects to be initiated using partial application:
        functools.partial(sig_handler, HTTPServer())
    This partial application is typically handled by signals.sig_listener.
    """

    io_loop = IOLoop.current()

    # execute callback on next event loop tick
    io_loop.add_callback_from_signal(shutdown, server, timeout, io_loop)


def sig_listener(server: HTTPServer, timeout: int = 0):
    """Configures listeners for TERM/INT signals.

    Timeout should be a positive integer, otherwise a default will be provided.
    """

    if not timeout:
        timeout = SHUTDOWN_TIMEOUT

    p = functools.partial(sig_handler, server, timeout)

    signal.signal(signal.SIGTERM, p)
    signal.signal(signal.SIGINT, p)

Tests

import asyncio
import os
import signal
import threading
import time
from unittest import mock

import bf_metrics

import bf_tornado.handlers
import bf_tornado.signals

import tornado.gen
import tornado.ioloop
import tornado.testing
import tornado.util
import tornado.web


class TestGracefulShutdown(tornado.testing.AsyncHTTPTestCase):
    """Verify server allows in-flight requests to complete before shutdown.

    Note: we're using tornado's `yield` abstraction instead of proper
    async/await syntax because of the use of `gen_test` decorator to work
    around fact the ioloop isn't running when running the test suite.

    This also means we need to use the `get_url` abstraction function to
    convert the requested path into a fully qualified path to the locally
    running web server.
    """
    def get_app(self):
        class FooHandler(bf_tornado.handlers.BaseHandler):
            metrics = bf_metrics.Metrics(namespace='foo', host='localhost')

            async def get(self):
                await asyncio.sleep(3)
                self.finish('OK')

        return tornado.web.Application([
            (r'/', FooHandler)
        ])

    def tearDown(self):
        """Ensure we verify the `stop_loop` mock was called."""

        try:
            self.mock_stop_loop.assert_called()
        except AssertionError:
            self.fail("mock_stop_loop assertion failed")

    @mock.patch("bf_tornado.signals.stop_loop")
    @tornado.testing.gen_test
    def test_inflight_completed(self, mock_stop_loop):
        """verify in-flight request completes after SIGINT is received.

        The test flow is:

            - make network request
            - issue interrupt signal (via separate thread)
            - network request completes before ioloop shutdown timeout

        We mock the `stop_loop` function which is what would normally trigger
        the ioloop to be stopped. So although we issue a SIGINT it won't
        actually stop the ioloop that is running this test.

        The request flow means the web server request will complete BEFORE the
        mocked `stop_loop` function has been called. This is a problem because
        the test function will finish executing and thus the assertion check at
        that time is likely to fail.

        To solve this problem, we sleep to enable enough time for the
        `stop_loop` function to be called, then we assign the mock to the class
        so that we can reference it within the tearDown function. That gives
        our asynchronous task time to be called.
        """

        # ensure ioloop waits long enough for request to complete
        shutdown_timeout = 10
        bf_tornado.signals.sig_listener(self.http_server, shutdown_timeout)

        pid = os.getpid()

        def trigger_signal():
            # defer SIGNINT long enough to allow HTTP request to tornado server
            time.sleep(1)
            os.kill(pid, signal.SIGINT)

        thread = threading.Thread(target=trigger_signal)
        thread.daemon = True
        thread.start()

        resp = yield self.http_client.fetch(self.get_url('/'))
        assert resp.code == 200

        # this sleep causes tornado's ioloop to context switch back to active
        # when this test function finishes (and before the tearDown) is
        # triggered, thus allowing the async task to call the stop_loop mock.
        time.sleep(1)

        # we can't assert the mock was called from this test function as the
        # test function itself is blocking the ioloop background task.
        #
        # the reason it blocks is because the http request is designed to
        # finish before the shutdown timeout and so once we're back inside the
        # test function we cannot context switch back to the asyncio task that
        # is trying to verify if it should call `stop_loop`.
        #
        # this means we must assert against the mock within a tearDown function.
        self.mock_stop_loop = mock_stop_loop

    @mock.patch("bf_tornado.signals.stop_loop")
    @tornado.testing.gen_test
    def test_inflight_dropped(self, mock_stop_loop):
        """verify in-flight request is dropped after SIGINT is received.

        The test flow is:

            - make network request
            - issue interrupt signal (via separate thread)
            - ioloop shuts down before network request completes

        We mock the `stop_loop` function which is what would normally trigger
        the ioloop to be stopped. So although we issue a SIGINT it won't
        actually stop the ioloop that is running this test.

        The request flow means the web server request will NOT complete before
        the mocked `stop_loop` function has been called.
        """

        # reset mock property
        self.mock_stop_loop = mock_stop_loop

        # ensure ioloop does NOT wait long enough for request to complete
        shutdown_timeout = 1
        bf_tornado.signals.sig_listener(self.http_server, shutdown_timeout)

        pid = os.getpid()

        def trigger_signal():
            # defer SIGNINT long enough to allow HTTP request to tornado server
            time.sleep(1)
            os.kill(pid, signal.SIGINT)

        thread = threading.Thread(target=trigger_signal)
        thread.daemon = True
        thread.start()

        resp = yield self.http_client.fetch(self.get_url('/'))
        assert resp.code == 200

        # we can assert the mock was called here because the background asyncio
        # task has completed (i.e. called `stop_loop`) by the time the http
        # request has completed.
        mock_stop_loop.assert_called()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment