Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peketamin/1be94ea42efef013c74816848914a7c8 to your computer and use it in GitHub Desktop.
Save peketamin/1be94ea42efef013c74816848914a7c8 to your computer and use it in GitHub Desktop.
Investigation: SoftTimeLimitExceeded occurred in task.chunks()

Experiment

Assuming: Using Celery with Django.

Target task is debug_task (in my_app.celery)

import os
from celery import Celery, Task
from celery.utils.log import get_task_logger
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
from django.conf import settings  # noqa

app = Celery('my_app')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

logger = get_task_logger('smdb.tasks')

class MyTask(Task):

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'[MyTask.on_success] successしたよ')
        super().on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f'[MyTask.on_failure] exc: {exc}, task_id: {task_id}, args: {args}, kwargs: {kwargs}, einfo: {einfo}')
        super().on_failure(exc, task_id, args, kwargs, einfo)


@app.task(bind=True, name='debug_task', base=MyTask)
def debug_task(self, arg=1):
    import time
    time.sleep(5)
    logger.warning('Request: {0!r}'.format(self.request))
    # if arg != 3:
    #   raise Exception('Yah, this is test.')
    print(f'Congrats: {arg}')


from celery.signals import task_failure


@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs):
    logger.warning(f'[task_failure_handler] sender:{sender}, task_id:{task_id}, exception:{exception}, traceback:{traceback}, einfo:{einfo}, kwargs:{kwargs}')

my_app.settings

CELERYD_TASK_SOFT_TIME_LIMIT = 10

In Django shell...

from celery.exceptions import SoftTimeLimitExceeded
from my_app.celery import debug_task, app

@app.task(bind=True)                               
def callback(self, *args, **kwargs):   
    print(f"args:{args}, kwargs:{kwargs}")


grp = debug_task.on_failure().chunks([[1], [2], [3], [4], [5], [6], [7]], 2)
for task_chunk in tuple(grp.group().items())[2][1]['tasks']:
    print(f"[debug_task.chunks] chunk: {task_chunk}")
ch = grp.on_error(callback)()
ch.get()
  • One chunk raise SoftTimeLimitExceeded because it takes over 10 seconds.
  • task.chunks() doesn't call on_failure()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment