Skip to content

Instantly share code, notes, and snippets.

@alexanderjulo
Created November 25, 2015 06:10
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save alexanderjulo/913beb668a4f25471f2e to your computer and use it in GitHub Desktop.
flask application factory with celery
from flask import Flask
from celery import Celery
def create_app():
app = Flask(__name__)
TaskBase = celery.Task
class Task(TaskBase):
"""
The usual celery base `Task` does not offer any integration into
flask, which is why we need this modified class. As the modified
class is dependent on the app, it can not just be subclassed,
but has to be dynamically created which is being taken care of
here.
"""
abstract = True
def __call__(self, *args, **kwargs):
# FIXME: actually we only need app context, but flask-babel
# is broken and needs a request context
with app.test_request_context("/"):
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = Task
celery.conf.update(app.config)
if app.config.get('CELERY_BROKER_URL'):
celery.conf['BROKER_URL'] = app.config['CELERY_BROKER_URL']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment