Skip to content

Instantly share code, notes, and snippets.

@realityone
Last active April 25, 2017 02:26
Show Gist options
  • Save realityone/0e668e06b7a9a8d0a453 to your computer and use it in GitHub Desktop.
Save realityone/0e668e06b7a9a8d0a453 to your computer and use it in GitHub Desktop.
tornado-rq
# https://github.com/mayflaver/tornado-celery
from rq import Queue
from tornado import stack_context
from tornado.concurrent import TracebackFuture
from tornado.ioloop import IOLoop
class JobFailed(Exception):
pass
class AsyncQueue(Queue):
def __init__(self, async=True, default_timeout=5, *args, **kwargs):
super(AsyncQueue, self).__init__(async, default_timeout, *args, **kwargs)
self.io_loop = IOLoop.instance()
def async(self, task, callback=None, *args, **kwargs):
future = TracebackFuture()
if callback is not None:
callback = stack_context.wrap(callback)
self.io_loop.add_future(future, lambda future: callback(future.result()))
job = self.enqueue(task, *args, **kwargs)
self.io_loop.add_callback(self._on_result, job, future)
return future
def _on_result(self, job, future):
if job.result is not None:
future.set_result(job.result)
elif job.is_failed is True:
future.set_exception(JobFailed('Job Failed.'))
else:
self.io_loop.add_callback(self._on_result, job, future)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment