Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Tornado and Asyncio Mixed example
# -*- coding: utf-8 -*-
import asyncio
import re
import asyncio_redis
import tornado.concurrent
import tornado.httpclient
import tornado.web
import tornado.platform.asyncio
def coroutine(func):
func = asyncio.coroutine(func)
def decorator(*args, **kwargs):
future = tornado.concurrent.Future()
def future_done(f):
try:
future.set_result(f.result())
except Exception as e:
future.set_exception(e)
asyncio.async(func(*args, **kwargs)).add_done_callback(future_done)
return future
return decorator
class MainHandler(tornado.web.RequestHandler):
def initialize(self, redis):
self.redis = redis
@asyncio.coroutine
def _get_dolar(self):
client = tornado.httpclient.AsyncHTTPClient()
resp = yield from tornado.platform.asyncio.to_asyncio_future(
client.fetch('http://dolarhoje.com')
)
dolar = re.search(
r'<input type="text" id="nacional" value="(?P<dolar>[^"]+)"/>',
resp.body.decode('utf-8')
)
if dolar:
return dolar.group(1)
@coroutine
def get(self):
dolar = yield from self.redis.get('dolar')
if not dolar:
dolar = yield from self._get_dolar()
yield from self.redis.set('dolar', dolar, expire=60)
self.write(dolar)
@asyncio.coroutine
def get_redis_connection():
return (yield from asyncio_redis.Connection.create(
host='localhost', port=6379
))
if __name__ == '__main__':
tornado.platform.asyncio.AsyncIOMainLoop().install()
ioloop = asyncio.get_event_loop()
redis = ioloop.run_until_complete(get_redis_connection())
app = tornado.web.Application([
(r"/", MainHandler, dict(redis=redis)),
])
app.listen(8888)
ioloop.run_forever()
@derek-adair

This comment has been minimized.

Copy link

@derek-adair derek-adair commented Feb 18, 2016

Hey this is a good example, thank you.

@drgarcia1986

This comment has been minimized.

Copy link
Owner Author

@drgarcia1986 drgarcia1986 commented Apr 19, 2016

You're welcome 😄

@WGH-

This comment has been minimized.

Copy link

@WGH- WGH- commented Feb 22, 2017

There's an even simpler way:

import asyncio
import functools

from tornado.platform.asyncio import to_tornado_future

def aio_coroutine_to_tornado_gen(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return to_tornado_future(asyncio.ensure_future(func(*args, **kwargs)))
    return wrapper
@archever

This comment has been minimized.

Copy link

@archever archever commented Jan 12, 2018

thank a lot, this is helpfull

@wzyonggege

This comment has been minimized.

Copy link

@wzyonggege wzyonggege commented Dec 4, 2018

I also use uvloop instead of asyncio in Event Loop.

@Ravi2712

This comment has been minimized.

Copy link

@Ravi2712 Ravi2712 commented Feb 11, 2019

This is really helpful.! Thank You! 👍

@gekagal

This comment has been minimized.

Copy link

@gekagal gekagal commented Apr 19, 2020

Please tell me what to do with this code at the top? ((

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment