Last active
April 25, 2017 02:26
-
-
Save realityone/0e668e06b7a9a8d0a453 to your computer and use it in GitHub Desktop.
tornado-rq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/mayflaver/tornado-celery | |
from rq import Queue | |
from tornado import stack_context | |
from tornado.concurrent import TracebackFuture | |
from tornado.ioloop import IOLoop | |
class JobFailed(Exception): | |
pass | |
class AsyncQueue(Queue): | |
def __init__(self, async=True, default_timeout=5, *args, **kwargs): | |
super(AsyncQueue, self).__init__(async, default_timeout, *args, **kwargs) | |
self.io_loop = IOLoop.instance() | |
def async(self, task, callback=None, *args, **kwargs): | |
future = TracebackFuture() | |
if callback is not None: | |
callback = stack_context.wrap(callback) | |
self.io_loop.add_future(future, lambda future: callback(future.result())) | |
job = self.enqueue(task, *args, **kwargs) | |
self.io_loop.add_callback(self._on_result, job, future) | |
return future | |
def _on_result(self, job, future): | |
if job.result is not None: | |
future.set_result(job.result) | |
elif job.is_failed is True: | |
future.set_exception(JobFailed('Job Failed.')) | |
else: | |
self.io_loop.add_callback(self._on_result, job, future) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment