-
-
Save methane/2185380 to your computer and use it in GitHub Desktop.
from time import sleep | |
from tornado.httpserver import HTTPServer | |
from tornado.ioloop import IOLoop | |
from tornado.web import Application, asynchronous, RequestHandler | |
from multiprocessing.pool import ThreadPool | |
_workers = ThreadPool(10) | |
def run_background(func, callback, args=(), kwds={}): | |
def _callback(result): | |
IOLoop.instance().add_callback(lambda: callback(result)) | |
_workers.apply_async(func, args, kwds, _callback) | |
# blocking task like querying to MySQL | |
def blocking_task(n): | |
sleep(n) | |
return n | |
class Handler(RequestHandler): | |
@asynchronous | |
def get(self): | |
run_background(blocking_task, self.on_complete, (10,)) | |
def on_complete(self, res): | |
self.write("Test {0}<br/>".format(res)) | |
self.finish() | |
HTTPServer(Application([("/", Handler)],debug=True)).listen(8888) | |
IOLoop.instance().start() |
Thanks a lot!!
thanks!
@mclate thanks!
Note that the ioloop argument is only required when you want to use a callback. If you use await, you can also use this:
async def tpexec(executor, fn, *args, **kwargs):
fut = executor.submit(fn, *args, **kwargs)
return await asyncio.wrap_future(fut)
And then you can do:
await tpexec(executor, orm.object.save)
@gwillem's example didn't work for me unless I changed asyncio.wrap_future
into to_tornado_future
. I made a decorator @blocking
that works like @run_on_executor
but uses the await
syntax. Example:
import time
from functools import wraps
from concurrent.futures import ThreadPoolExecutor
import tornado
import tornado.ioloop
import tornado.gen
from tornado.platform.asyncio import to_tornado_future
io_loop = tornado.ioloop.IOLoop.current()
def blocking(method):
"""Wraps the method in an async method, and executes the function on `self.executor`."""
@wraps(method)
async def wrapper(self, *args, **kwargs):
fut = self.executor.submit(method, self, *args, **kwargs)
return await to_tornado_future(fut)
return wrapper
class SomeClass:
executor = ThreadPoolExecutor(4)
@blocking
def block_for_long_time(self):
print(" > Starting with blocking...")
time.sleep(1)
print(" > Stopping with blocking...")
return 3.1415
async def test(self):
"""Emulates concurrent requests."""
for i in range(4):
io_loop.spawn_callback(self.spawn_me)
print(">>> Waiting a while...")
# Give the callbacks enough time to finish
await tornado.gen.sleep(2)
print(">>> Done.")
async def spawn_me(self):
"""Emulates handling of a single request."""
print(" >> spawned!")
res = await self.block_for_long_time()
print(" >> got", res)
io_loop.run_sync(SomeClass().test)
Expected output: (order of lines can differ for 'Stopping ...' and 'got...' lines):
>>> Waiting a while...
>> spawned!
> Starting with blocking...
>> spawned!
> Starting with blocking...
>> spawned!
> Starting with blocking...
>> spawned!
> Starting with blocking...
> Stopping with blocking...
> Stopping with blocking...
>> got 3.1415
> Stopping with blocking...
>> got 3.1415
>> got 3.1415
> Stopping with blocking...
>> got 3.1415
>>> Done.
Thank you @mclate. I have made a decorator out of this: https://gist.github.com/lucascosta/4ddd0afadfee75398536cb4125a8732b
Note for other travellers - ThreadPoolExecutor
works with @run_on_executor
but ProcessPoolExecutor
does not, because the calling class is not serializeable as required by ProcessPoolExecutor. This confused me for a while.
seems like there are some changes in Tornado 5+ related to Futures, so decorator above didn't work for me. So I've modified it to:
def blocking(method):
"""
Wraps the method in an async method, and executes the function on `self.executor`
"""
@wraps(method)
async def wrapper(self, *args, **kwargs):
def work():
return method(self, *args, **kwargs)
return await io_loop.run_in_executor(self.executor, work)
return wrapper
cool