Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Tornado Example: Delegating an blocking task to a worker thread pool from an asynchronous request handler
from time import sleep
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application, asynchronous, RequestHandler
from multiprocessing.pool import ThreadPool
_workers = ThreadPool(10)
def run_background(func, callback, args=(), kwds={}):
def _callback(result):
IOLoop.instance().add_callback(lambda: callback(result))
_workers.apply_async(func, args, kwds, _callback)
# blocking task like querying to MySQL
def blocking_task(n):
sleep(n)
return n
class Handler(RequestHandler):
@asynchronous
def get(self):
run_background(blocking_task, self.on_complete, (10,))
def on_complete(self, res):
self.write("Test {0}<br/>".format(res))
self.finish()
HTTPServer(Application([("/", Handler)],debug=True)).listen(8888)
IOLoop.instance().start()
@mclate

This comment has been minimized.

Copy link

mclate commented Sep 17, 2014

Comment for lone strangers (as i was here):

import time

from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor   # `pip install futures` for python2

MAX_WORKERS = 4


class Handler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

    @run_on_executor
    def background_task(self, i):
        """ This will be executed in `executor` pool. """
        time.sleep(10)
        return i

    @tornado.gen.coroutine
    def get(self, idx):
        """ Request that asynchronously calls background task. """
        res = yield self.background_task(idx)
        self.write(res)

Docs: http://tornado.readthedocs.org/en/stable/concurrent.html#tornado.concurrent.run_on_executor

@liuyangc3

This comment has been minimized.

Copy link

liuyangc3 commented Mar 7, 2015

cool

@matztam

This comment has been minimized.

Copy link

matztam commented Apr 4, 2015

Thanks a lot!!

@nezaidu

This comment has been minimized.

Copy link

nezaidu commented Oct 21, 2015

thanks!

@pricco

This comment has been minimized.

Copy link

pricco commented Jan 4, 2016

@mclate thanks!

@emehrkay

This comment has been minimized.

Copy link

emehrkay commented Mar 9, 2016

This saved my life!! Thanks both to @methane and @mclate

@gwillem

This comment has been minimized.

Copy link

gwillem commented Mar 13, 2016

Note that the ioloop argument is only required when you want to use a callback. If you use await, you can also use this:

async def tpexec(executor, fn, *args, **kwargs):
    fut = executor.submit(fn, *args, **kwargs)
    return await asyncio.wrap_future(fut)

And then you can do:

await tpexec(executor, orm.object.save)
@evertheylen

This comment has been minimized.

Copy link

evertheylen commented Mar 28, 2016

@gwillem's example didn't work for me unless I changed asyncio.wrap_future into to_tornado_future. I made a decorator @blocking that works like @run_on_executor but uses the await syntax. Example:

import time
from functools import wraps
from concurrent.futures import ThreadPoolExecutor

import tornado
import tornado.ioloop
import tornado.gen
from tornado.platform.asyncio import to_tornado_future

io_loop = tornado.ioloop.IOLoop.current()

def blocking(method):
    """Wraps the method in an async method, and executes the function on `self.executor`."""
    @wraps(method)
    async def wrapper(self, *args, **kwargs):
        fut = self.executor.submit(method, self, *args, **kwargs)
        return await to_tornado_future(fut)
    return wrapper

class SomeClass:
    executor = ThreadPoolExecutor(4)

    @blocking
    def block_for_long_time(self):
        print("  > Starting with blocking...")
        time.sleep(1)
        print("  > Stopping with blocking...")
        return 3.1415

    async def test(self):
        """Emulates concurrent requests."""
        for i in range(4):
            io_loop.spawn_callback(self.spawn_me)
        print(">>> Waiting a while...")
        # Give the callbacks enough time to finish
        await tornado.gen.sleep(2)
        print(">>> Done.")

    async def spawn_me(self):
        """Emulates handling of a single request."""
        print(" >> spawned!")
        res = await self.block_for_long_time()
        print(" >> got", res)

io_loop.run_sync(SomeClass().test)

Expected output: (order of lines can differ for 'Stopping ...' and 'got...' lines):

>>> Waiting a while...
 >> spawned!
  > Starting with blocking...
 >> spawned!
  > Starting with blocking...
 >> spawned!
  > Starting with blocking...
 >> spawned!
  > Starting with blocking...
  > Stopping with blocking...
  > Stopping with blocking...
 >> got 3.1415
  > Stopping with blocking...
 >> got 3.1415
 >> got 3.1415
  > Stopping with blocking...
 >> got 3.1415
>>> Done.
@lucasrcosta

This comment has been minimized.

Copy link

lucasrcosta commented Jul 29, 2016

Thank you @mclate. I have made a decorator out of this: https://gist.github.com/lucascosta/4ddd0afadfee75398536cb4125a8732b

@danmackinlay

This comment has been minimized.

Copy link

danmackinlay commented Oct 31, 2016

Note for other travellers - ThreadPoolExecutor works with @run_on_executor but ProcessPoolExecutor does not, because the calling class is not serializeable as required by ProcessPoolExecutor. This confused me for a while.

@mdutkin

This comment has been minimized.

Copy link

mdutkin commented Dec 21, 2018

seems like there are some changes in Tornado 5+ related to Futures, so decorator above didn't work for me. So I've modified it to:

def blocking(method):
    """
    Wraps the method in an async method, and executes the function on `self.executor`
    """

    @wraps(method)
    async def wrapper(self, *args, **kwargs):
        def work():
            return method(self, *args, **kwargs)
        return await io_loop.run_in_executor(self.executor, work)

    return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.