Skip to content

Instantly share code, notes, and snippets.

@IrSent
Created November 12, 2016 19:12
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save IrSent/5e4820f6b187d3654967b55e27d5d204 to your computer and use it in GitHub Desktop.
Save IrSent/5e4820f6b187d3654967b55e27d5d204 to your computer and use it in GitHub Desktop.
Celery Best Practices
# Safe Queue Celery App Settings
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('some_project',
broker='amqp://',
backend='amqp://',
include=['some_project.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
app.conf.update(
enable_utc=True,
timezone='Europe/Kiev',
)
if __name__ == '__main__':
app.start()
# Sentry
# By default CELERYD_HIJACK_ROOT_LOGGER = True
# Is important variable that allows Celery to overlap other custom logging handlers
CELERYD_HIJACK_ROOT_LOGGER = False
LOGGING = {
'handlers': {
'celery_sentry_handler': {
'level': 'ERROR',
'class': 'core.log.handlers.CelerySentryHandler'
}
},
'loggers': {
'celery': {
'handlers': ['celery_sentry_handler'],
'level': 'ERROR',
'propagate': False,
},
}
}
# ...
#
# Prioritize your tasks!
#
CELERY_QUEUES = (
Queue('high', Exchange('high'), routing_key='high'),
Queue('normal', Exchange('normal'), routing_key='normal'),
Queue('low', Exchange('low'), routing_key='low'),
)
CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'
CELERY_ROUTES = {
# -- HIGH PRIORITY QUEUE -- #
'myapp.tasks.check_payment_status': {'queue': 'high'},
# -- LOW PRIORITY QUEUE -- #
'myapp.tasks.close_session': {'queue': 'low'},
}
# Keep result only if you really need them: CELERY_IGNORE_RESULT = False
# In all other cases it is better to have place somewhere in db
CELERY_IGNORE_RESULT = True
# ...
# Launch your workers
celery worker -E -l INFO -n worker.high -Q high
celery worker -E -l INFO -n worker.normal -Q normal
celery worker -E -l INFO -n worker.low -Q low
# This worker will accept tasks if for example all other high queue workers are busy
celery worker -E -l INFO -n worker.whatever
# Use FLOWER to monitor your Celery app `https://github.com/mher/flower`, `https://flower.readthedocs.io/`
$ pip install flower
# Launch the server and open http://localhost:5555
$ flower -A some_project --port=5555
# Or, launch from Celery
$ celery flower -A proj --address=127.0.0.1 --port=5555
# Broker URL and other configuration options can be passed through the standard Celery options
$ celery flower -A proj --broker=amqp://guest:guest@localhost:5672//
# Keep your tasks in tasks.py small and readable
# Don't place your business logic in it, just import them here
from __future__ import absolute_import, unicode_literals
from .celery import app
from .utils import generate_report, send_email
# BTW bind=True is helpful to restart this task in case it is failed
@app.task(bind=True)
def send_report(self):
filename = generate_report()
try:
send_email(subject, message, attachments=[filename])
except SomeError as exc:
# Retry in 5 minutes.
raise self.retry(countdown=60 * 5, exc=exc)
# set time limits to avoid worker hangs
@app.task(bind=True, soft_time_limit=120, time_limit=180)
def some_long_task_with_high_risk_of_failure():
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment