Skip to content

Instantly share code, notes, and snippets.

@jxskiss
Last active November 3, 2021 10:29
Show Gist options
  • Save jxskiss/aadf53659837b2052145cfc1c37cc8ea to your computer and use it in GitHub Desktop.
Save jxskiss/aadf53659837b2052145cfc1c37cc8ea to your computer and use it in GitHub Desktop.
Get celery task results within tornado, celery [issues#3577](https://github.com/celery/celery/issues/3577)
# Tornado app.py
from tornado import web, options
from tornado.platform.asyncio import AsyncIOMainLoop
import asyncio
AsyncIOMainLoop().install()
import tasks
from celery_task_utils import CeleryTaskProxy
class AddHandler(web.RequestHandler):
@property
def celery_proxy(self):
return self.application.settings['celery_proxy']
async def get(self):
x, y, sleep = map(lambda arg: int(self.get_argument(arg)),
('x', 'y', 'sleep'))
result = await self.celery_proxy.commit(tasks.add, x, y, sleep)
self.finish(str(result))
options.parse_command_line()
app = web.Application(
handlers=[
(r'/add', AddHandler),
],
debug=True,
celery_proxy=CeleryTaskProxy()
)
app.listen(8000)
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(
app.settings['celery_proxy'].cleanup())
loop.close()
# Celery Task Proxy
from aioredis.errors import ChannelClosedError, ConnectionClosedError
from celery.exceptions import ImproperlyConfigured
import aioredis
import asyncio
import json
import logging
import re
_logger = logging.getLogger(__name__)
class CeleryTaskProxy(object):
"""
Commit celery task using this when you need to retrieve the result within
Tornado/asyncio loop, ONLY redis backend is supported.
NOTE: this is not a common case when using celery and is not recommended.
"""
def __init__(self, celery_app=None, loop=None):
if celery_app is None:
celery_app = celery.current_app
self.backend = celery_app.conf['result_backend'] or ''
self.backend_db = None
self.pool = None
self.pending_tasks = {}
self.loop = loop or asyncio.get_event_loop()
self.initialize()
def initialize(self):
match = re.match(
r'redis://(?::(\w+)?@)?([\w.]+):(\d+)/(\d{1,2})',
self.backend
)
if not match:
raise ImproperlyConfigured(
'redis backend improperly configured: %s', self.backend)
password, host, port, db_idx = match.groups()
self.backend_db = int(db_idx or 0)
self.pool = self.loop.run_until_complete(
aioredis.create_pool(
(host, int(port)),
db=self.backend_db, password=password,
minsize=2))
try:
self.loop.run_until_complete(self._check_redis_option())
except ImproperlyConfigured:
self.pool.close()
self.loop.run_until_complete(self.pool.wait_closed())
raise
self.loop.call_soon(asyncio.ensure_future, self.handle_events())
async def _check_redis_option(self):
with await self.pool as red:
redis_config = await red.config_get('notify-keyspace-events')
notify_conf = redis_config.get('notify-keyspace-events')
_logger.info('redis server notify-keyspace-events config: "%s"',
notify_conf)
if not (notify_conf and (
('E' in notify_conf and 'A' in notify_conf) or
('E' in notify_conf and '$' in notify_conf))):
raise ImproperlyConfigured(
'redis server notify-keyspace-events improperly '
'configured: "%s"' % (notify_conf, ))
async def cleanup(self):
self.pool.close()
await self.pool.wait_closed()
async def handle_events(self):
while not (self.loop.is_closed() or self.pool.closed):
try:
await self._handle_events()
except (ChannelClosedError, ConnectionClosedError) as err:
_logger.warning(
'unexpected %s error, retrying to connect to redis '
'in 2 seconds', type(err))
await asyncio.sleep(2)
except ConnectionRefusedError as err:
_logger.warning(
'connecting to redis refused, retrying to connect '
'in 10 seconds')
await asyncio.sleep(10)
except Exception as err:
if self.loop.is_closed():
return
_logger.exception('unhandled error: %s', err)
async def _handle_events(self):
with await self.pool as sub:
channel = '__keyevent@{}__:set'.format(self.backend_db)
_logger.debug('subscribing channel: %s', channel)
channel = (await sub.psubscribe(channel))[0]
async for msg in channel.iter():
_logger.debug('message: %s', msg)
key = msg[1]
if key not in self.pending_tasks:
continue
await self.on_task_result(key)
async def on_task_result(self, backend_key):
fut = self.pending_tasks.pop(backend_key)
with await self.pool as res_client:
result = await res_client.get(backend_key)
result = json.loads(result.decode('utf-8'))['result']
fut.set_result(result)
async def commit(self, task, *args,
callback=None, timeout=30, **kwargs):
task_result = task.delay(*args, **kwargs)
backend_key = task.backend.get_key_for_task(task_result.task_id)
fut = self.loop.create_future()
self.pending_tasks[backend_key] = fut
_logger.debug(self.pending_tasks)
if callback:
fut.add_done_callback(lambda f: callback(f.result()))
if timeout <= 0:
result = await fut
return result
try:
result = await asyncio.wait_for(fut, timeout=timeout)
except asyncio.futures.TimeoutError as err:
self.pending_tasks.pop(backend_key)
raise
return result
# Celery tasks.py
from celery import Celery
import time
app = Celery('tasks',
broker='redis://:@127.0.0.1:6379/0',
backend='redis://:@127.0.0.1:6379/1')
@app.task
def add(x, y, sleep=0):
if sleep:
time.sleep(sleep)
return x + y
@jxskiss
Copy link
Author

jxskiss commented Jul 5, 2017

Forgot to mention, the redis server must be configured with "notify-keyspace-events" at least contains "E$", and version 2.8+ is required.

@ugljesas
Copy link

Great work on this, just ran into a problem with subscription connections becoming unresponsive after some time, without throwing any exceptions.

If you run into this issue with the latest version of Redis change the defaults in redis.conf to:
timeout 600
tcp-keepalive 60

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment