Skip to content

Instantly share code, notes, and snippets.

@jcushman
Last active December 7, 2017 19:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jcushman/b9081cf686b0801d481639988c5194fd to your computer and use it in GitHub Desktop.
Save jcushman/b9081cf686b0801d481639988c5194fd to your computer and use it in GitHub Desktop.
Reproduction of celery bug #2682, autoscale causes billiard to raise WorkerLostError
This is a minimal reproduction of the celery bug "--autoscale causes billiard to raise WorkerLostError":
https://github.com/celery/celery/issues/2682
To run this reproduction, launch the celery worker and rabbitmq queue:
$ docker-compose up
In another window, launch 2000 tasks that each sum the numbers 1 to 30 million:
$ docker-compose exec worker python -c 'from tasks import *; launch_sum_ints(2000, millions=30)'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/app/tasks.py", line 38, in launch_sum_ints
result.get()
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 635, in get
on_message=on_message,
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 749, in join_native
raise value
celery.backends.base.WorkerLostError: Worker exited prematurely: signal 15 (SIGTERM).
Most of the tasks complete, but one or two crash. There are slightly more details logged in the `docker-compose up` window:
worker_1 | [2017-12-07 15:49:16,217: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM).',)
worker_1 | Traceback (most recent call last):
worker_1 | File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
worker_1 | human_status(exitcode)),
worker_1 | billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15 (SIGTERM).
NOTE: On my system, these crashes occur with `millions=30` but not `millions=10`. This suggests the bug is specific to
tasks that use a certain amount of CPU, or take a certain amount of time. For reproduction on a faster computer you may
need to increase the value.
version: '2'
services:
rabbit:
image: rabbitmq:latest
worker:
build: .
volumes:
- .:/app
depends_on:
- rabbit
command: celery -A tasks worker --loglevel=info --autoscale=100
FROM python:3.6-stretch
RUN pip install celery
RUN mkdir /app
WORKDIR /app
from celery import Celery, group
from celery.utils.log import get_logger
from celery.worker.autoscale import Autoscaler as CeleryAutoscaler
# JitterAutoscaler is a subclass of the normal Celery autoscaler that scales between max_concurrency and max_concurrency-1
# on each call to _maybe_scale.
# This simulates a normal autoscaling worker with reserved tasks jittering somewhere between min_concurrency and max_concurrency.
info = get_logger(__name__).info
class JitterAutoscaler(CeleryAutoscaler):
def _maybe_scale(self, req=None):
if self.processes < self.max_concurrency:
info("Scaling up 1 ...")
self.scale_up(1)
else:
info("Scaling down 1 ...")
self.scale_down(1)
# Start a standard celery app:
app = Celery('tasks', broker='pyamqp://guest:guest@rabbit/', backend='rpc://')
app.conf.worker_autoscaler = 'tasks:JitterAutoscaler'
# Task to burn a second or so of CPU time:
@app.task
def sum_ints(millions=30):
sum(range(millions*1000000))
# Launch a bunch of tasks in parallel. Collect the results so we'll get an exception if one of the workers
# raises WorkerLostError:
def launch_sum_ints(count=100, millions=30):
job = group(sum_ints.s(millions) for _ in range(count))
result = job.apply_async()
result.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment