Skip to content

Instantly share code, notes, and snippets.

@miraculixx
Last active February 7, 2023 23:12
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save miraculixx/e826729386aaf2b1f36b0d21591f9e71 to your computer and use it in GitHub Desktop.
Save miraculixx/e826729386aaf2b1f36b0d21591f9e71 to your computer and use it in GitHub Desktop.
Celery worker blocks on rate limited task
Celery worker blocks on rate limited task
=========================================
by github.com/miraculixx
Problem:
If a worker has a rate_limit active on some task, and that task
arrives (is received) more often than the rate limit interval, all
worker processes will block on these task instances and stop
consuming other tasks as soon as the prefetch count has maxed out
(max_concurrency * prefetch multiplier)
Expected:
According to the documentation and least-surprise interpretation [1][2]
setting a rate_limit should not affect any other tasks. At least the
behavior as observed should be stated as a warning in the documentation.
Workarounds:
* in general set max_concurrency * worker_prefetch_multiplier to a number
larger than the expected rate of task arrival for the rate limited tasks
* use autoscale to dynamically add workers if you have bursts of tasks
this will dynamically increase and decrease the active worker processes
which can reduce the problem
* separate workers and queues for rate limited tasks to ensure there is
never any worker saturation for all other tasks
Environment:
$ celery -A tasks report
software -> celery:4.4.0 (cliffs) kombu:4.6.7 py:3.7.6
billiard:3.6.2.0 redis:3.4.1
platform -> system:Linux arch:64bit
kernel version:4.15.0-76-generic imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis://localhost/
broker_url: 'redis://localhost:6379//'
result_backend: 'redis://localhost/'
How to reproduce
----------------
1. Create a sample celery app with two tasks A and B (see tasks.py)
2. Run with one worker, max concurrency set to 4
$ celery -A tasks worker --loglevel=DEBUG -c 4
3. Use the command line to rate_limit task A to 1/h
$ celery -A tasks control rate_limit tasks.A 1/h
-> celery@arwen: OK
new rate limit set successfully
4. Submit task A 5 times
# do this 5 times
$ celery -A tasks call tasks.A 2 4
Result: The worker log will show 5 Received task messages and execute
the first one, no more (that's the rate limit in effect, as expected).
[2020-02-22 14:15:26,771: INFO/ForkPoolWorker-1] Task tasks.A[ea7dcee8-ac98-4e6c-88e8-51dfd0e9cdc6] succeeded in 0.002578758983872831s: 6
[2020-02-22 14:15:27,534: INFO/MainProcess] Received task: tasks.A[9af3010e-1f79-462f-b77d-2c56ccbb1d20]
[2020-02-22 14:15:28,069: INFO/MainProcess] Received task: tasks.A[5f7315d5-7f53-443a-8f48-1e9365cf7c52]
[2020-02-22 14:15:28,595: INFO/MainProcess] Received task: tasks.A[51b04b84-831e-45cb-9b73-7481f7707589]
[2020-02-22 14:15:29,225: INFO/MainProcess] Received task: tasks.A[474f7ad4-48b6-4c26-9066-4324784e12ee]
5. Call task B.
$ celery -A tasks call tasks.B --args '[2, 4]'
55deebde-edf5-41ae-adc1-a78b323c4b39
Result: Task B will never be received nor executed until the next time a task A
has been run (at the next rate limit interval). The 4 rate limited tasks
submitted in step 4 are essentially blocking all the worker processes
from consuming any more tasks
6. Check worker status
# looking good
$ celery -A tasks inspect registered
-> celery@arwen: OK
* tasks.A [rate_limit=1/h]
* tasks.B
$ celery -A tasks inspect active
-> celery@arwen: OK
- empty -
$ celery -A tasks inspect scheduled
-> celery@arwen: OK
- empty -
$ celery -A tasks inspect reserved
-> celery@arwen: OK
- empty -
7. Growing the process pool will execute task B
$ celery -A tasks control pool_grow 1
-> celery@arwen: OK
pool will grow
Result: Task B is received and run by the new worker process
[2020-02-22 14:18:19,677: INFO/MainProcess] Received task: tasks.B[55deebde-edf5-41ae-adc1-a78b323c4b39]
(...)
[2020-02-22 14:18:19,678: INFO/ForkPoolWorker-1] Task tasks.B[55deebde-edf5-41ae-adc1-a78b323c4b39] succeeded in 0.0005468999734148383s: -2
8. Submitting another task A will block the new worker too
$ celery -A tasks call tasks.A 2 4
$ celery -A tasks call tasks.B 2 4
Result: Task B will never not be received nor executed
References
----------
[1] https://groups.google.com/forum/#!searchin/celery-users/rate$20limit$20stops$20worker%7Csort:date/celery-users/NBw6EqXwVuA/7SK0fNCnAQAJ
[2] https://freenode.logbot.info/celery/20200221#c3282156
version: '2'
services:
redis:
image: redis
ports:
- "6379:6379"
celery==4.4.0
honcho==1.0.1
redis==3.4.1
from celery import Celery
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
# for demo purpose restrict the number of prefetched tasks to 1
# this effectively limits the number of rate limited tasks submitted to 4
# before the worker gets blocked. Leaving it at the default value (4) means
# you need 4 * 4 = 16 tasks before the worker stops receiving new tasks
# That is the number of rate limited tasks required to block the worker
# increases however it does not solve the problem (though setting the prefetch
# multiplier to some very large number e.g. sys.maxint may be a work around
# in some scenarios)
# https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-prefetch-multiplier
app.conf.worker_prefetch_multiplier = 1
@app.task
def A(x, y):
return x + y
@app.task
def B(x, y):
return x - y
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment