Skip to content

Instantly share code, notes, and snippets.

@sameerkumar18
Last active January 16, 2024 08:12
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sameerkumar18/e792fa6026e063eaef695b3c271959e6 to your computer and use it in GitHub Desktop.
Save sameerkumar18/e792fa6026e063eaef695b3c271959e6 to your computer and use it in GitHub Desktop.
Django Admin: Celery Retry Task by ID
# Add this code in any Django app's admin.py
# Works for all Task Statuses; you can filter them in line 12.
import ast
import importlib
import json
from django.contrib import admin
from django.contrib import messages
from django.utils.safestring import mark_safe
from django_celery_results.admin import TaskResultAdmin
from django_celery_results.models import TaskResult
def retry_celery_task_admin_action(modeladmin, request, queryset):
msg = ''
for task_res in queryset:
if task_res.status != 'FAILURE':
msg += f'{task_res.task_id} => Skipped. Not in "FAILURE" State<br>'
continue
try:
task_actual_name = task_res.task_name.split('.')[-1]
module_name = '.'.join(task_res.task_name.split('.')[:-1])
kwargs = json.loads(task_res.task_kwargs)
if isinstance(kwargs, str):
kwargs = kwargs.replace("'", '"')
kwargs = json.loads(kwargs)
if kwargs:
getattr(importlib.import_module(module_name), task_actual_name).apply_async(kwargs=kwargs,
task_id=task_res.task_id)
if not kwargs:
args = ast.literal_eval(ast.literal_eval(task_res.task_args))
getattr(importlib.import_module(module_name), task_actual_name).apply_async(args,
task_id=task_res.task_id)
msg += f'{task_res.task_id} => Successfully sent to queue for retry.<br>'
except Exception as ex:
msg += f'{task_res.task_id} => Unable to process. Error: {ex}<br>'
messages.error(request, mark_safe(msg))
retry_celery_task_admin_action.short_description = 'Retry Task'
class CustomTaskResultAdmin(TaskResultAdmin):
actions = [retry_celery_task_admin_action, ]
admin.site.unregister(TaskResult)
admin.site.register(TaskResult, CustomTaskResultAdmin)
@verhovensky
Copy link

It worth mentioning, that you have to have option

CELERY_RESULT_EXTENDED = True

in your project_root/settings.py file in order for this code to work, otherwise you will get TypeError.

PS We can also omit importlib usage and use our celery instance of the app to start a new task....

@milosev1c
Copy link

django-celery-results seems not to store task_kwargs in JSON format, so that json.loads may not work.
If your task kwargs can have None, you have to do something with this
I used eval(task_res.task_kwargs) as workaround until I fixed task arguments problem

@eriktelepovsky
Copy link

Have you tried something like this?

def retry_task(task_id):    
    meta = celery_app.backend.get_task_meta(task_id)
    task = celery_app.tasks[meta['name']]
    task.apply_async(args=meta['args'], kwargs=meta['kwargs']) #specify any other parameters you might be passing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment