Skip to content

Instantly share code, notes, and snippets.

@debrice

debrice/README.md

Last active Dec 11, 2016
Embed
What would you like to do?
Celery task bug in a SQLAlchemy event

Celery Bug

When running a celery task, in a SQLAlchemy event, in a unit-test the connection get destroyed somehow.

I attached the requirement to reproduce the bug:

git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py

There is a bounty open for this bug on stack overflow

Flask==0.10.1
Flask-SQLAlchemy==2.0
Jinja2==2.7.3
MarkupSafe==0.23
SQLAlchemy==0.9.8
Werkzeug==0.9.6
amqp==1.4.6
anyjson==0.3.3
argparse==1.2.1
billiard==3.3.0.18
celery==3.1.16
itsdangerous==0.24
kombu==3.0.23
pytz==2014.7
wsgiref==0.1.2
import unittest
from celery import Celery
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy, event
# this is pretty standard init for celery, as exposed on flask website
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
# App standard initialization
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL=None,
CELERY_RESULT_BACKEND=None,
CELERY_ALWAYS_EAGER=True,
SQLALCHEMY_DATABASE_URI='sqlite://'
)
celery = make_celery(app)
db = SQLAlchemy(app)
# We need a model to attach the event to
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
value = db.Column(db.Integer)
# Here is the event. Triggering a delete on the model
# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):
# right here
# >>> connection.closed
# False
my_task.delay()
# and now...
# >>> connection.closed
# True
# The task doesn't do anything so its body isn't to blame
@celery.task()
def my_task():
# print "my task"
return
class CeleryTestCase(unittest.TestCase):
def setUp(self):
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
def test_delete_task_outside_event(self):
user = User(value=1)
db.session.add(user)
db.session.commit()
my_task.delay()
user.value=2
db.session.add(user)
db.session.commit()
def test_delete_task_in_event(self):
user = User(value=1)
db.session.add(user)
db.session.commit()
db.session.delete(user)
db.session.commit()
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment