Skip to content

Instantly share code, notes, and snippets.

@pdonorio
Last active July 4, 2016 09:16
Show Gist options
  • Save pdonorio/6dc440a644f776943946fc80eac46085 to your computer and use it in GitHub Desktop.
Save pdonorio/6dc440a644f776943946fc80eac46085 to your computer and use it in GitHub Desktop.
Asynchronous tasks

Celery tasks on a Redis queue with Python 3

The idea will be explained through Docker containers.

Initialization

Create a docker network (called 'cel') to have our test containers all connected but isolated:

docker network create cel

Step 1: setup the redis database

With Docker it's just as simple as:

docker run --rm --net cel --name myredis redis:alpine

Now we have a 'myredis' database running, reachable from other containers that will be added to the network 'cel'.

Step 2: connect a Celery worker

Let's create a new container with the latest stable Python

docker run --rm -it --net cel --name myworker python:3.5.2-alpine ash

Install libraries and add a user test. This is because celery workers cannot be executed as root for security reasons.

/ # pip install redis celery requests
/ # adduser test -D && su - test

Now write your celery app and tasks inside. See for example worker.py.

Then launch your celery worker on the python module (e.g. in this case 'worker'):

celery worker -A worker -l info

The worker is now connected and ready to execute tasks when they are queued on redis.

Step 3: add requests with a Python client

Run a new container to become the client

docker run --rm -it --net cel --name myclient python:3.5.2-alpine ash
/ # pip install redis celery requests

This client must access the same code of the worker, (e.g. the worker.py file), and use it to call delayied tasks. About delay, see the relative celery documentation

An example of 5 asynchrnous tasks is in the file client.py.

Test the queue with

python client.py

and check what happens on the worker side.

Clean up

Remove containers and private network:

docker stop myredis myworker myclient
docker network rm cel

We may check celery tasks queued at any time. Let's see how.

Save the id

When you launch the task you get the AsyncResult back. To save the id you may:

res = my_async_task.delay(i)
yourdb.save(res.id)

Use the id to connect back to Celery somewhere else

from celery.result import AsyncResult

id = yourdb.get(filter=some_parameter)
res = AsyncResult(id)

Check the status

if res.ready():
  print("The task has finished")
  out = res.get()

or wait and hang in there...

out = res.get()
from worker import my_async_task
for i in range(5):
my_async_task.delay(i)
from celery import Celery
app = Celery('MyCeleryApp', broker='redis://myredis:6379/0')
# Skip initial warnings, avoiding pickle
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_RESULT_SERIALIZER = 'json'
@app.task
def my_async_task(arg):
print("This is asynchronous: %s" % arg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment