Skip to content

Instantly share code, notes, and snippets.

@christhekeele
Forked from twolfson/README.md
Last active March 17, 2020 19:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save christhekeele/0088a37ae213f3c3d02beaae83e2cdc4 to your computer and use it in GitHub Desktop.
Save christhekeele/0088a37ae213f3c3d02beaae83e2cdc4 to your computer and use it in GitHub Desktop.
Leverage Flask-SQLAlchemy with Celery

Last update: 2020-03-17

Flask-SQLAlchemy has some nice built-ins (e.g. accessing query directly on classes). To continue leveraging these nicities while still inside of a Celery worker, we need to make sure we setup/teardown in a similar fashion to Flask-SQLAlchemy does on Flask.

Setup

Flask-SQLAlchemy uses create_scoped_session at startup which avoids any setup on a per-request basis.

https://github.com/mitsuhiko/flask-sqlalchemy/blob/2.0/flask_sqlalchemy/__init__.py#L668

This means Celery can piggyback off of this initialization.

Teardown

Flask-SQLAlchemy tears down when we leave the request/application context.

https://github.com/mitsuhiko/flask-sqlalchemy/blob/2.0/flask_sqlalchemy/__init__.py#L747-L753

To create a similar experience, we can run teardown when a Celery task ends:

We put in a clause about CELERY_ALWAYS_EAGER to prevent conflicts with Flask's normal execution.

from celery.signals import task_postrun

def handle_celery_postrun(retval=None, *args, **kwargs):
    """After each Celery task, clean up our db session"""
    
    # If instructed, commit the session on successful task completion
    #  to ensure any pending write activity makes it to the database
    if app.config.get('SQLALCHEMY_COMMIT_ON_TEARDOWN'):
        if not isinstance(retval, Exception):
            db.session.commit()
            
    # If we aren't in an eager request (where Flask will perform teardown),
    #  then purge the session to prevent failed transactions from leaking
    if not app.config.get('CELERY_ALWAYS_EAGER'):
        db.session.remove()
        
task_postrun.connect(handle_celery_postrun)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment