Skip to content

Instantly share code, notes, and snippets.

@auvipy
Forked from sameerkumar18/yourapp_admin.py
Created January 16, 2024 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save auvipy/2a56caa57581fc4607060edb6ddacc04 to your computer and use it in GitHub Desktop.
Save auvipy/2a56caa57581fc4607060edb6ddacc04 to your computer and use it in GitHub Desktop.
Django Admin: Celery Retry Task by ID
# Add this code in any Django app's admin.py
# Works for all Task Statuses; you can filter them in line 12.
import ast
import importlib
import json
from django.contrib import admin
from django.contrib import messages
from django.utils.safestring import mark_safe
from django_celery_results.admin import TaskResultAdmin
from django_celery_results.models import TaskResult
def retry_celery_task_admin_action(modeladmin, request, queryset):
msg = ''
for task_res in queryset:
if task_res.status != 'FAILURE':
msg += f'{task_res.task_id} => Skipped. Not in "FAILURE" State<br>'
continue
try:
task_actual_name = task_res.task_name.split('.')[-1]
module_name = '.'.join(task_res.task_name.split('.')[:-1])
kwargs = json.loads(task_res.task_kwargs)
if isinstance(kwargs, str):
kwargs = kwargs.replace("'", '"')
kwargs = json.loads(kwargs)
if kwargs:
getattr(importlib.import_module(module_name), task_actual_name).apply_async(kwargs=kwargs,
task_id=task_res.task_id)
if not kwargs:
args = ast.literal_eval(ast.literal_eval(task_res.task_args))
getattr(importlib.import_module(module_name), task_actual_name).apply_async(args,
task_id=task_res.task_id)
msg += f'{task_res.task_id} => Successfully sent to queue for retry.<br>'
except Exception as ex:
msg += f'{task_res.task_id} => Unable to process. Error: {ex}<br>'
messages.error(request, mark_safe(msg))
retry_celery_task_admin_action.short_description = 'Retry Task'
class CustomTaskResultAdmin(TaskResultAdmin):
actions = [retry_celery_task_admin_action, ]
admin.site.unregister(TaskResult)
admin.site.register(TaskResult, CustomTaskResultAdmin)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment