Skip to content

Instantly share code, notes, and snippets.

@methane
Created March 24, 2012 17:28
Show Gist options
  • Save methane/2185380 to your computer and use it in GitHub Desktop.
Save methane/2185380 to your computer and use it in GitHub Desktop.
Tornado Example: Delegating an blocking task to a worker thread pool from an asynchronous request handler
from time import sleep
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application, asynchronous, RequestHandler
from multiprocessing.pool import ThreadPool
_workers = ThreadPool(10)
def run_background(func, callback, args=(), kwds={}):
def _callback(result):
IOLoop.instance().add_callback(lambda: callback(result))
_workers.apply_async(func, args, kwds, _callback)
# blocking task like querying to MySQL
def blocking_task(n):
sleep(n)
return n
class Handler(RequestHandler):
@asynchronous
def get(self):
run_background(blocking_task, self.on_complete, (10,))
def on_complete(self, res):
self.write("Test {0}<br/>".format(res))
self.finish()
HTTPServer(Application([("/", Handler)],debug=True)).listen(8888)
IOLoop.instance().start()
@evertheylen
Copy link

@gwillem's example didn't work for me unless I changed asyncio.wrap_future into to_tornado_future. I made a decorator @blocking that works like @run_on_executor but uses the await syntax. Example:

import time
from functools import wraps
from concurrent.futures import ThreadPoolExecutor

import tornado
import tornado.ioloop
import tornado.gen
from tornado.platform.asyncio import to_tornado_future

io_loop = tornado.ioloop.IOLoop.current()

def blocking(method):
    """Wraps the method in an async method, and executes the function on `self.executor`."""
    @wraps(method)
    async def wrapper(self, *args, **kwargs):
        fut = self.executor.submit(method, self, *args, **kwargs)
        return await to_tornado_future(fut)
    return wrapper

class SomeClass:
    executor = ThreadPoolExecutor(4)

    @blocking
    def block_for_long_time(self):
        print("  > Starting with blocking...")
        time.sleep(1)
        print("  > Stopping with blocking...")
        return 3.1415

    async def test(self):
        """Emulates concurrent requests."""
        for i in range(4):
            io_loop.spawn_callback(self.spawn_me)
        print(">>> Waiting a while...")
        # Give the callbacks enough time to finish
        await tornado.gen.sleep(2)
        print(">>> Done.")

    async def spawn_me(self):
        """Emulates handling of a single request."""
        print(" >> spawned!")
        res = await self.block_for_long_time()
        print(" >> got", res)

io_loop.run_sync(SomeClass().test)

Expected output: (order of lines can differ for 'Stopping ...' and 'got...' lines):

>>> Waiting a while...
 >> spawned!
  > Starting with blocking...
 >> spawned!
  > Starting with blocking...
 >> spawned!
  > Starting with blocking...
 >> spawned!
  > Starting with blocking...
  > Stopping with blocking...
  > Stopping with blocking...
 >> got 3.1415
  > Stopping with blocking...
 >> got 3.1415
 >> got 3.1415
  > Stopping with blocking...
 >> got 3.1415
>>> Done.

@lucasrcosta
Copy link

Thank you @mclate. I have made a decorator out of this: https://gist.github.com/lucascosta/4ddd0afadfee75398536cb4125a8732b

@danmackinlay
Copy link

Note for other travellers - ThreadPoolExecutor works with @run_on_executor but ProcessPoolExecutor does not, because the calling class is not serializeable as required by ProcessPoolExecutor. This confused me for a while.

@mdutkin
Copy link

mdutkin commented Dec 21, 2018

seems like there are some changes in Tornado 5+ related to Futures, so decorator above didn't work for me. So I've modified it to:

def blocking(method):
    """
    Wraps the method in an async method, and executes the function on `self.executor`
    """

    @wraps(method)
    async def wrapper(self, *args, **kwargs):
        def work():
            return method(self, *args, **kwargs)
        return await io_loop.run_in_executor(self.executor, work)

    return wrapper

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment