Skip to content

Instantly share code, notes, and snippets.

@ionelmc
Last active December 16, 2015 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ionelmc/5430247 to your computer and use it in GitHub Desktop.
Save ionelmc/5430247 to your computer and use it in GitHub Desktop.
celery-bug-1316
.builds
celeryd*.log
celeryd*.pid
*.deb
*.komodoproject
.komodotools
./*.log
:memory
./*.pid
.pip-cache
.project
*.pyc
.pydevproject
.settings
test-results.xml
tmp
.ve
#!.ve/bin/python
import gc
import os
os.environ['MP_LOG'] = '1'
os.environ['CELERY_RDBSIG'] = '1'
import time
import sys
from celery import Celery
celery = Celery(
broker="amqp://test:test@localhost:5672/test"
)
celery.conf.update(
CELERY_TASK_SERIALIZER = "pickle",
CELERYD_MAX_TASKS_PER_CHILD = 1,
CELERY_WORKER_DIRECT = True,
BROKER_POOL_LIMIT = None,
CELERY_RESULT_BACKEND = "mongodb",
CELERY_MONGODB_BACKEND_SETTINGS = {},
CELERY_DISABLE_RATE_LIMITS = True,
CELERYD_PREFETCH_MULTIPLIER = 0,
CELERY_MAX_CACHED_RESULTS = 0,
)
class Bubu(object):
def __init__(self, data):
self.data = data
@celery.task
def stuff(whatev):
time.sleep(10)
if __name__ == '__main__':
if len(sys.argv) > 1 and sys.argv[1] == 'produce-and-revoke':
while 1:
try:
results = [stuff.apply_async(
args=(Bubu(os.urandom(200000)),),
options={
'queue_name': None,
'exchange': 'C.dq',
'routing_key': file('/etc/hostname').read().strip(),
}
) for i in range(50)]
print results
time.sleep(2)
print [res.revoke(terminate=True) for res in results]
print [res.get(propagate=False) for res in results]
except Exception:
import traceback
traceback.print_exc()
print 'wait 2 secs ...'
time.sleep(2)
else:
#import tracemalloc
#tracemalloc.enable()
#top = tracemalloc.DisplayTop(5000, file=open('/tmp/memory-profile-%s' % time.time(), "w"))
#top.show_lineno = True
#try:
# celery.start()
#finally:
# top.display()
try:
celery.start()
finally:
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
import objgraph
import IPython
IPython.embed()
#!/bin/bash -eEx
rm -rf .ve
virtualenv .ve
#.ve/bin/pip install celery[mongo]
#.ve/bin/pip install django-celery-with-mongodb
#.ve/bin/pip install \
# amqp==1.0.8 \
# anyjson==0.3.3 \
# kombu==2.5.6 \
# python-dateutil==1.5 \
# billiard==2.7.3.21 \
# celery==3.0.15 \
# pymongo==2.5 \
.ve/bin/pip install --download-cache=.ve/.cache \
https://github.com/celery/py-amqp/archive/master.zip \
https://github.com/celery/billiard/archive/master.zip \
https://github.com/celery/kombu/archive/master.zip \
pymongo==2.5 \
https://github.com/celery/celery/archive/master.zip \
objgraph ipython
#.ve/bin/pip install --download-cache=.ve/.cache \
# -r ./celery/requirements/dev.txt \
# pymongo==2.5 \
# ./celery
#
# #https://github.com/celery/kombu/archive/master.zip \
# #https://github.com/celery/billiard/archive/master.zip \
sudo rabbitmqctl delete_vhost test
sudo rabbitmqctl add_vhost test
sudo rabbitmqctl add_user test test || echo test user already created
sudo rabbitmqctl set_permissions -p test test ".*" ".*" ".*"

Overview

Currently tested with:

  • celery 3.0.15 + librabbitmq => memory leak
  • celery 3.0.15 + amqp => memory leak (but leaks less)
  • celery 3.0.19 + amqp => node failure (workers turn into zombies, master process hangs)
  • celery asynwrite (based on 3.0.19 ?) + billiard 2.7 + librabbitmq => memory leak
  • celery asynwrite (based on 3.0.19 ?) + billiard 2.7 + amqp => memory leak

Steps to reproduce

In first terminal

./bootstrap.sh
./app.py worker --events --purge --loglevel=DEBUG --concurrency 5

In second terminal

./app.py produce-and-revoke

In third terminal ----------------

while true; do sudo service rabbitmq-server restart; sleep 10; done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment