Skip to content

Instantly share code, notes, and snippets.

@mlavin
Created September 23, 2013 14:17
Show Gist options
  • Star 34 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mlavin/6671079 to your computer and use it in GitHub Desktop.
Save mlavin/6671079 to your computer and use it in GitHub Desktop.
Celery Late Ack Example

Running the Example

Start the worker:

celery -A tasks worker --loglevel=info -c 2 --pidfile=celery.pid

In another terminal send 6 tasks:

python script.py

You should see task 1 and task 2 start. Before they complete kill the worker gracefully:

# Send graceful shutdown
kill -TERM `cat celery.pid`
# Send a second TERM to complete the shutdown
kill -TERM `cat celery.pid`

Or forcefully:

ps auxww | grep celery | grep -v grep | awk '{print $2}' | xargs kill -SIGKILL

Now restart the worker:

celery -A tasks worker --loglevel=info -c 2 --pidfile=celery.pid

Findings

With the RabbitMQ backend the worker will start up and begin processing tasks 1 and 2.

With the current Redis backend the worker will begin processing task 3 and 4 then 5 and 6 and finally 1 and 2. The restore_all_unacknowledged_messages run on start up makes it so 1 and 2 are picked up as soon as the 5 and 6 are finished otherwise they would not run until after the default visibility timeout (1 hour).

With the updated Redis backend the worker will begin processing task 1 and 2 as desired. This still requires the use of restore_all_unacknowledged_messages to ensure that the messages are restored instantly rather than waiting for the visibility timeout. This is probably not a good idea if you have multiple workers configured to consumer from the same queue which are not always restarted together.

from time import sleep
from tasks import longtask
for i in range(6):
longtask.delay(i + 1)
sleep(0.1)
import logging
import time
from celery import Celery
from celery.signals import worker_init
from kombu.transport import TRANSPORT_ALIASES
TRANSPORT_ALIASES['newredis'] = 'transport.Transport'
# celery = Celery('tasks', broker='redis://localhost:6379/0')
# celery = Celery('tasks', broker='amqp://guest:guest@localhost:5672//')
celery = Celery('tasks', broker='newredis://localhost:6379/0')
celery.conf.update(
CELERYD_PREFETCH_MULTIPLIER=1,
CELERY_IGNORE_RESULT=True,
)
# Not needed for RabbitMQ
def restore_all_unacknowledged_messages():
conn = celery.connection(transport_options={'visibility_timeout': 0})
qos = conn.channel().qos
qos.restore_visible()
print 'Unacknowledged messages restored'
# Not needed for RabbitMQ
@worker_init.connect
def configure(sender=None, conf=None, **kwargs):
restore_all_unacknowledged_messages()
@celery.task(acks_late=True)
def longtask(n):
logging.info('Starting long task %s', n)
time.sleep(60)
"""New Redis transport with correct ack emulation."""
from kombu.transport.redis import logger, dumps
from kombu.transport.redis import Channel as BaseChannel
from kombu.transport.redis import Transport as BaseTransport
class Channel(BaseChannel):
def _do_restore_message(self, payload, exchange, routing_key, client=None):
with self.conn_or_acquire(client) as client:
try:
try:
payload['headers']['redelivered'] = True
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):
client.rpush(queue, dumps(payload))
except Exception:
logger.critical('Could not restore message: %r', payload,
exc_info=True)
class Transport(BaseTransport):
Channel = Channel
@alexmojaki
Copy link

What's newredis? Can't see any info about that anywhere.

@erdnaxeli
Copy link

newredis is the class Transport defined on transport.py.

@tomassykora
Copy link

tomassykora commented Nov 25, 2018

This really helped me, thanks! Although if the tasks running in restore_visible() are killed too, another call to restore_all_unacknowledged_messages() won't execute them anymore. Is there something to do about this?

@yataosu
Copy link

yataosu commented Aug 13, 2019

Thank you for this!
This really helped me!

@mintyPT
Copy link

mintyPT commented Nov 9, 2021

Still helping in 2021...

@Genarito
Copy link

Thank you very much! After days of looking for a solution to retrieve pending tasks that were interrupted by worker crashes this solution is the only one that perfectly worked for me! 🥳

@FluffyDietEngine
Copy link

Still helping in 2021...

Still helping in 2023 🥇

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment