Skip to content

Instantly share code, notes, and snippets.

@pfote
Created December 3, 2015 12:01
Show Gist options
  • Save pfote/0e6136260f4d76377577 to your computer and use it in GitHub Desktop.
Save pfote/0e6136260f4d76377577 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# whats mysql? MySQLdb?
from mysql.connector import errorcode, connect, Error
class JobQueue:
"""docstrings are missing"""
def __init__(self, dbConfig):
"""docstrings for methods as well"""
try:
self.db = connect(user=dbConfig["user"], password=dbConfig["password"],
host=dbConfig["host"], database=dbConfig["dbname"])
except Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
errormsg = "Access to database denied"
elif err.errno == errorcode.ER_BAD_DB_ERROR:
errormsg = "Database does not exist"
else:
errormsg = err
raise Exception(errormsg)
def __del__(self):
"""probably unnecessary"""
try:
self.db
except NameError:
pass
else:
self.db.close()
def getNext(self, prio="high", jobs=1):
"""locking missing, race condition ahead"""
# query formatting according to DBAPI
query = ("""SELECT queue.id, queue.job_id, queue.prio_level, job.filename, job.classname, job.method
FROM pm_job_queue queue JOIN pm_cronjobs job ON (job.id = queue.job_id)
WHERE queue.prio_level = ?
AND queue.status = 'new' AND job.active = 1
LIMIT %d""" % jobs)
cursor = self.db.cursor()
cursor.execute(query, (prio,))
job = []
# using a dict cursor might be easier
# sure its not cursor.fetchall() ?)
for (id, job_id, prio_level, filename, classname, method) in cursor:
job.append({
"id": id, "job_id": job_id, "prio_level": prio_level,
"filename": filename, "classname": classname,
"method": method
})
cursor.close()
return job
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment