Skip to content

Instantly share code, notes, and snippets.

@anapaulagomes
Last active March 13, 2023 12:26
Show Gist options
  • Save anapaulagomes/8ae00bad282aafd1ec08471ac2944015 to your computer and use it in GitHub Desktop.
Save anapaulagomes/8ae00bad282aafd1ec08471ac2944015 to your computer and use it in GitHub Desktop.
Example of Celery signals
from celery.signals import task_failure, after_task_publish
@app.task(bind=True)
def my_task(self):
print(1/0) # this line will cause an exception
@after_task_publish.connect(sender='mymodule.tasks.my_task')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
@task_failure.connect(sender=my_task) # FIXME by name it doesn't work
def task_failure_handler(sender=None, headers=None, body=None, **kwargs):
print('This will be executed after if `my task` fails')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment