Skip to content

Instantly share code, notes, and snippets.

@nuria
Last active October 22, 2021 21:19
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save nuria/35f332acfb84ecf80e3b to your computer and use it in GitHub Desktop.
Save nuria/35f332acfb84ecf80e3b to your computer and use it in GitHub Desktop.
Handlers that execute right before and right after a task is executed in celery.
from celery import Celery
from celery.signals import after_task_publish,task_success,task_prerun,task_postrun
# first argument, current module
app = Celery('tasks')
app.config_from_object('celeryconfig')
# To instantiate celery and import this module
# do: celery -A task worker --loglevel=info
# after, once celery is running, instantiate a python console:
'''
from task import add
add.delay(4,4)
'''
# config:
'''
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_RESULT_EXPIRES = 3600
CELERY_DISABLE_RATE_LIMITS = True
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
CELERYD_CONCURRENCY = 16
CELERYD_TASK_TIME_LIMIT = 3630
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'task.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
}
'''
@app.task
def add(x,y):
return x+y
@app.task
def write(msg):
# opening file for apending
# this task does not return anything
f = open('logfile','a');
f.write(str(msg))
f.close()
@app.task
def evil_add(x,y):
raise Exception('Boom!')
#####################################
'''
another example of how to add the 'hooks'
class Celery(object):
@staticmethod
def install():
task_prerun.connect(task_prerun)
task_postrun.connect(task_postrun)
'''
#####################################
@task_prerun.connect()
def task_prerun(signal=None, sender=None, task_id=None, task=None, args=None, kwargs=None):
if task.name =="task.add":
print "pre-run of add. Do special add things here. Task: {0} sender: {1}".format(task,sender)
@task_postrun.connect()
def task_postrun(signal=None, sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None):
# note that this hook runs even when there has been an exception thrown by the task
print "post run {0} ".format(task)
@karolpawlowski
Copy link

As I know you shouldn't name method decorated with @task_prerun.connect() -> task_prerun and with @task_postrun.connect()-> task_postrun.

They shouldn't be redefined since you importing these methods

@zinnov-manu
Copy link

As I know you shouldn't name method decorated with @task_prerun.connect() -> task_prerun and with @task_postrun.connect()-> task_postrun.

They shouldn't be redefined since you importing these methods

Since the purpose is to over ride the existing functionality of the existing function, I think overriding shouldn't be a problem unless the existing functionality is required else where in the project.

@alexbodn
Copy link

thanks for the example.
this is not overwriting the names in celery.signals, but only in this source.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment