Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save samwelkanda/86e7aac65f8a98ba758eb2ae04d4103e to your computer and use it in GitHub Desktop.
Save samwelkanda/86e7aac65f8a98ba758eb2ae04d4103e to your computer and use it in GitHub Desktop.

Django, Docker, uWSGI, Celery, Redis

Django

python manage.py makemigrations --dry-run --verbosity 3 let's you look at what the migration will actually look like without creating the migration file

Docker

uWSGI

Celery

Celery is a tool that helps you manage tasks that should occur outside the request/response cycle. Any task that takes more than half a second is a great candidate for turning into a Celery task. Celery is especially helpful for transforming blocking transactions on your site into non-blocking transactions. Celery can help by offloading that work to different tasks.

You can use Celery to send email, update your database with side effects from the request that was just processed, query an API and store the result, and a lot more. Another thing Celery is helpful for is scheduling tasks to run at specific times. You might be familiar with cron jobs, which are tasks that run at specific intervals you define.

requirements.txt

celery==4.2.1
redis==2.10.6

celery.py

import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

tasks.py

from celery import shared_task


@shared_task
def hello():
    print(“Hello there!”)
    
@app.task(autoretry_for=(TapiocaException,), retry_backoff=True)
def likes_do_facebook(self):
    api = Facebook(access_token=ACCESS_TOKEN)
    api.user_likes(id='me').get()

In proj/__init__.py, add the following:

from .celery import app as celery_app

__all__ = ['celery_app']

To test that your hello() task works, you can run it locally as a regular Python function. Start a Python shell using docker-compose run web ./manage.py shell. Run:

>>> from app.tasks import hello
>>> hello()
Hello there!

If you would like to test running your task as a Celery task, run:

>>> hello.delay()
<AsyncResult: ba845cf3-e60b-4432-a9d8-9943621cb8a0>

Celery Beat

We will use a feature called Celery beat to schedule our task to run periodically. Celery beat is the Celery scheduler. It executes tasks as often as you tell it to.

settings.py

from celery.schedules import crontab

CELERY_BROKER_URL = 'redis://redis:6379'
CELERY_RESULT_BACKEND = 'redis://redis:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERY_BEAT_SCHEDULE = {
    'hello': {
        'task': 'app.tasks.hello',
        'schedule': crontab()  # execute every minute
    }
}

docker-compose.yml

version: '3'

services:
  db:
    image: postgres:9.6.5
    volumes:
      - postgres_data:/var/lib/postgresql/data/
  redis:
    image: "redis:alpine"
  web:
    build: .
    command: bash -c "python /code/manage.py migrate --noinput && python /code/manage.py runserver 0.0.0.0:8000"
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db
      - redis
  celery:
    build: .
    command: celery -A proj worker -l info
    volumes:
      - .:/code
    depends_on:
      - db
      - redis
  celery-beat:
    build: .
    command: celery -A proj beat -l info
    volumes:
      - .:/code
    depends_on:
      - db
      - redis

volumes:
  postgres_data:

Flower

You can use flower to monitor all your celery workers.

Installing flower with pip is simple

$ pip install flower

Launch the server and open http://localhost:5555

$ flower -A proj --port=5555

Or, launch from Celery

$ celery flower -A proj --address=127.0.0.1 --port=5555

Broker URL and other configuration options can be passed through the standard Celery options

$ celery flower -A proj --broker=amqp://guest:guest@localhost:5672//

Celery Best Practices

  1. Do not use complex objects in task as parameters. E.g.: Avoid Django model objects:
# Good
@app.task
def my_task(user_id):
    user = User.objects.get(id=user_id)
    print(user.name)
    # ...
# Bad
@app.task
def my_task(user):
    print(user.name)
    # ...
  1. Do not wait for other tasks inside a task.
  2. Prefer idempotent tasks:

"Idempotence is the property of certain operations in mathematics and computer science, that can be applied multiple times without changing the result beyond the initial application." - Wikipedia. 4. Prefer atomic tasks:

"An operation (or set of operations) is atomic ... if it appears to the rest of the system to occur instantaneously. Atomicity is a guarantee of isolation from concurrent processes. Additionally, atomic operations commonly have a succeed-or-fail definition—they either successfully change the state of the system, or have no apparent effect." - Wikipedia. 5. Retry when possible. But make sure tasks are idempotent and atomic before doing so. 6 Set retry_limit to avoid broken tasks to keep retrying forever.

  1. Exponentially backoff if things look like they are not going to get fixed soon. Throw in a random factor to avoid cluttering services:
def exponential_backoff(task_self):
    minutes = task_self.default_retry_delay / 60
    rand = random.uniform(minutes, minutes * 1.3)
    return int(rand ** task_self.request.retries) * 60

# in the task
raise self.retry(exc=e, countdown=exponential_backoff(self))
  1. Use autoretry_for to reduce the boilerplate code for retrying tasks.

  2. Use retry_backoff to reduce the boilerplate code when doing exponention backoff.

  3. For tasks that require high level of reliability, use acks_late in combination with retry . Again, make sure tasks are idempotent and atomic.

  4. Set hard and soft time limits. Recover gracefully if things take longer than expected:

from celery.exceptions import SoftTimeLimitExceeded

@app.task(task_time_limit=60, task_soft_time_limit=45) def my_task(): try: something_possibly_long() except SoftTimeLimitExceeded: recover() Use multiple queues to have more control over throughput and make things more scalable. (Routing Tasks)

Extend the base task class to define default behaviour. (Custom Task Classes)

Redis

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment