Assuming: Using Celery with Django.
Target task is debug_task
(in my_app.celery
)
import os
from celery import Celery, Task
from celery.utils.log import get_task_logger
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
from django.conf import settings # noqa
app = Celery('my_app')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
logger = get_task_logger('smdb.tasks')
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
logger.info(f'[MyTask.on_success] successしたよ')
super().on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.warning(f'[MyTask.on_failure] exc: {exc}, task_id: {task_id}, args: {args}, kwargs: {kwargs}, einfo: {einfo}')
super().on_failure(exc, task_id, args, kwargs, einfo)
@app.task(bind=True, name='debug_task', base=MyTask)
def debug_task(self, arg=1):
import time
time.sleep(5)
logger.warning('Request: {0!r}'.format(self.request))
# if arg != 3:
# raise Exception('Yah, this is test.')
print(f'Congrats: {arg}')
from celery.signals import task_failure
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs):
logger.warning(f'[task_failure_handler] sender:{sender}, task_id:{task_id}, exception:{exception}, traceback:{traceback}, einfo:{einfo}, kwargs:{kwargs}')
my_app.settings
CELERYD_TASK_SOFT_TIME_LIMIT = 10
In Django shell...
from celery.exceptions import SoftTimeLimitExceeded
from my_app.celery import debug_task, app
@app.task(bind=True)
def callback(self, *args, **kwargs):
print(f"args:{args}, kwargs:{kwargs}")
grp = debug_task.on_failure().chunks([[1], [2], [3], [4], [5], [6], [7]], 2)
for task_chunk in tuple(grp.group().items())[2][1]['tasks']:
print(f"[debug_task.chunks] chunk: {task_chunk}")
ch = grp.on_error(callback)()
ch.get()
- One chunk raise SoftTimeLimitExceeded because it takes over 10 seconds.
- task.chunks() doesn't call on_failure()