Skip to content

Instantly share code, notes, and snippets.

@JeOam
Last active November 11, 2020 16:39
Show Gist options
  • Save JeOam/de51c28317b1f8a832ff to your computer and use it in GitHub Desktop.
Save JeOam/de51c28317b1f8a832ff to your computer and use it in GitHub Desktop.
Celery Tips

#####What is Celery?
Celery is an asynchronous task queue. You can use it to execute tasks outside of the context of your application. The general idea is that any resource consuming tasks that your application may need to run can be offloaded to the task queue, leaving your application free to respond to client requests.

Celery has three core components:

  • The Celery client. This is used to issue background jobs.
  • The Celery workers. These are the processes that run the background jobs. Celery supports local and remote workers, so you can start with a single worker running on the same machine as your application server, and later add more workers as the needs of your application grow.
  • The message broker. The client communicates with the the workers through a message queue, and Celery supports several ways to implement these queues. The most commonly used brokers are RabbitMQ and Redis.

Ref:

@JeOam
Copy link
Author

JeOam commented Jul 14, 2015

Set up Celery message broker: RabbitMQ

# ubuntu
sudo apt-get install rabbitmq-server

Enable RabbitMQ Management Plugin(Optional):

sudo rabbitmq-plugins enable rabbitmq_management

Restart RabbitMQ:

sudo service rabbitmq-server restart

@JeOam
Copy link
Author

JeOam commented Jul 14, 2015

Using Celery

Installation:

$ pip install celery

Project layout:

proj/__init__.py
     /celery.py
     /tasks.py

proj/celery.py

from __future__ import absolute_import

from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

proj/tasks.py:

from __future__ import absolute_import

from proj.celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

Starting the worker:

# 在  proj 所在目录下运行
$ celery -A proj worker -l info
>>> result = add.delay(4, 4)
>>> result.ready()
False
>>> r.result
8

via Using Celery in your Application

@JeOam
Copy link
Author

JeOam commented Aug 3, 2015

备注,如果修改了上面 add 方法,则需要重启 celery worker server 方可让 celery 正确调用修改过的方法。

土方法检验 celery 的任务有没有被调用:

open("/tmp/temp.txt", 'w').close()
text_file = open("/tmp/temp.txt", "a")
text_file.write('============\n')
text_file.close()

@JeOam
Copy link
Author

JeOam commented Sep 17, 2015

flower: Real-time monitor and web admin for Celery distributed task queue

Installation:

pip install flower

Usage:

sudo flower --address=0.0.0.0 --port=80 --basic_auth=username:password

Running behind reverse proxy

@JeOam
Copy link
Author

JeOam commented Sep 25, 2015

Different Tasks in Different Queue

Project layout::

test.py
Labs/
        /__init__.py
        /celery.py
        /tasks.py 

Labs/celery.py

# coding=utf-8
from __future__ import absolute_import
from celery import Celery

app = Celery('Labs',
             broker='amqp://',
             backend='amqp://',
             include=['Labs.tasks'])

app.conf.update(
    CELERY_ROUTES={'tasks.test_a': {'queue': 'test_queue'}}
)

if __name__ == '__main__':
    app.start()

Labs/tasks.py

# coding=utf-8
from __future__ import absolute_import
from Labs.celery import app
import time

@app.task(name='task.test_a')
def test_a():
    text_file = open("/tmp/temp.txt", "a")
    text_file.write('====== a ======\n')
    text_file.close()
    time.sleep(5)

@app.task
def test_b():
    text_file = open("/tmp/temp.txt", "a")
    text_file.write('====== b ======\n')
    text_file.close()
    time.sleep(1)

test.py

# coding=utf-8
from __future__ import absolute_import
from celery import group
from Labs.tasks import test_a, test_b

if __name__ == '__main__':
    open("/tmp/temp.txt", 'w').close()
    jobs_a = group([test_a.s() for i in range(0, 10)])
    jobs_b = group([test_b.s() for i in range(0, 10)])
    jobs_a.apply_async()
    jobs_b.apply_async()

Start two worker:

$ celery -A Labs worker -l info -Q celery
$ celery -A Labs worker -l info -Q test_queue,celery

open test result file to see the task result, and open another terminal to run test script:

$ tail -f /tmp/temp.txt
# another terminal
$ python test.py

@JeOam
Copy link
Author

JeOam commented Sep 25, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment