Skip to content

Instantly share code, notes, and snippets.

@bblanchon
Created April 29, 2021 16:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bblanchon/55d5a4cea89e6afc0018ff9dfa6cd51f to your computer and use it in GitHub Desktop.
Save bblanchon/55d5a4cea89e6afc0018ff9dfa6cd51f to your computer and use it in GitHub Desktop.
Celery/Django: send email when task fails
from celery.signals import task_failure
from django.core.mail import mail_admins
from pprint import pformat
@task_failure.connect
def celery_task_failure_email(task_id, sender, exception, einfo, *args, **kwargs):
mail_admins(
f"ERROR: Task {sender.name} raised {exception.__class__.__name__}",
f"""Task {sender.name} with id {task_id} raised exception:
{exception!r}
args = {pformat(kwargs['args'])}
kwargs = {pformat(kwargs['kwargs'])}
{einfo}
""")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment