Skip to content

Instantly share code, notes, and snippets.

@mywaiting
Created July 26, 2019 09:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mywaiting/6ab89ce3eadd68289da96ffa113146cb to your computer and use it in GitHub Desktop.
Save mywaiting/6ab89ce3eadd68289da96ffa113146cb to your computer and use it in GitHub Desktop.
使用 python tornado 框架异步调用 celery 队列任务的实现,很简单的小函数,但是很好用
from celery import Celery
import time
celery = Celery('tasks', backend='redis://localhost', broker='amqp://')
@celery.task
def test(strs):
return strs
# -*- coding: utf-8 -*-
"""
torncelery
~~~~~~~~~~
assign celery task to Future and get task result async.
"""
from tornado.concurrent import TracebackFuture
from tornado.ioloop import IOLoop
def async(task, *args, **kwargs):
future = TracebackFuture()
callback = kwargs.pop("callback", None)
if callback:
IOLoop.instance().add_future(future,
lambda future: callback(future.result()))
result = task.delay(*args, **kwargs)
IOLoop.instance().add_callback(_on_result, result, future)
return future
def _on_result(result, future):
# if result is not ready, add callback function to next loop,
if result.ready():
future.set_result(result.result)
else:
IOLoop.instance().add_callback(_on_result, result, future)
import tornado.ioloop
import tornado.web
from tornado.gen import coroutine
from tasks import test
import torncelery
class MainHandler(tornado.web.RequestHandler):
@coroutine
def get(self):
result = yield torncelery.async(test, "hello world")
self.write("%s" % result )
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment