Skip to content

Instantly share code, notes, and snippets.

@RobSpectre
Created April 27, 2017 21:49
Show Gist options
  • Save RobSpectre/b56a3040d872bddbcdd6e545d70708cb to your computer and use it in GitHub Desktop.
Save RobSpectre/b56a3040d872bddbcdd6e545d70708cb to your computer and use it in GitHub Desktop.
Example of using Celery for Quasar Queue
from flask import Flask
from flask import request
from tasks import store_db
app = Flask(__name__)
@app.route('/blink', methods=['GET', 'POST'])
def blink():
params = request.get_json(force=True)
store_db.apply_async(args=[params])
return "Object Created", 201
if __name__ == ("__main__"):
app.debug = False
app.run()
import MySQLdb
from celery import Celery
from celery.signals import worker_process_init
from celery.signals import worker_process_shutdown
import config
conn = None
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@worker_process_init.connect
def init_worker(**kwargs):
global conn
print('Initializing database connection for worker.')
conn = MySQLdb.connect(host=config.MYSQL_HOST,
port=config.MYSQL_PORT,
user=config.MYSQL_USER,
passwd=config.MYSQL_PASSWORD,
db=config.MYSQL_DATABASE)
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
global conn
if conn:
print('Closing database connectionn for worker.')
conn.close()
@app.task
def store_db(params):
cursor = conn.cursor()
cursor.execute("INSERT INTO {2} "
"VALUES(\"{0}\", \"{1}\");"
"".format(params['data']['email_address'],
params['data']['customer_id'],
config.MYSQL_TABLE))
conn.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment