Skip to content

Instantly share code, notes, and snippets.

@marians
Created May 24, 2013 14:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marians/5643933 to your computer and use it in GitHub Desktop.
Save marians/5643933 to your computer and use it in GitHub Desktop.
Untested version of a job queue that relies on MongoDB
"""
Untested version of some job queue
Usage:
from pymongo import MongoClient
db = MongoClient()
queue = Queue("myqueue", db)
job = {
'key': 'foobar',
'payload': 'Some data needed for the job'
}
queue.add(job, job['key'])
hasjob = True
while hasjob:
thisjob = queue.get()
if thisjob is not None:
queue.resolve_job(job['key'])
else:
hasjob = False
"""
from pymongo.errors import DuplicateKeyError
from datetime import datetime
class Queue(object):
def __init__(self, name, db):
self.name = name
self.config = config
self.db = db.db
# implies that you MongoDB collection should be called "queue"
self.db.queue.ensure_index([('qname', 1), ('key', 1)], unique=True)
def add(self, key, payload=None):
"""
Add a job to the queue. If a job with the given key already exists,
no error occurs, but the job isn't added.
"""
job = {
'qname': self.name,
'status': 'OPEN',
'key': key,
'failures': 0,
'last_modified': datetime.utcnow()
}
if payload is not None:
job['payload'] = payload
try:
self.db.queue.save(job)
except DuplicateKeyError:
pass
def get(self):
"""
Return a job and mark it as "IN_PROGRESS" at the same time.
"""
query = {
'qname': self.name,
'status': 'OPEN'
}
update = {
'$set': {
'status': 'IN_PROGRESS',
'last_modified': datetime.utcnow()
}
}
find = self.db.queue.find_and_modify(query=query,
update=update)
out = {'key': find['key']}
if 'payload' in find:
out['payload'] = find['payload']
return out
def __len__(self):
"""
Returns the number of OPEN jobs
"""
query = {
'qname': self.name,
'status': 'OPEN'
}
num = self.db.queue.find(query).count()
return num
def resolve_job(self, key):
"""
Mark a job as "DONE".
"""
query = {
'qname': self.name,
'key': key
}
update = {
'$set': {
'status': 'DONE',
'last_modified': datetime.utcnow()
}
}
self.db.queue.find_and_modify(query=query,
update=update)
def mark_failed(self, key_or_element):
"""
Add 1 to the failure count of a job.
If the failure count reaches 3, set the job status
to "FAILED".
"""
key = None
if type(key_or_element) == dict:
key = key_or_element['key']
else:
key = key_or_element
query = {
'qname': self.name,
'key': key
}
job = self.db.queue.find_one(query)
update = {
'$inc': {
'failures': 1
}
}
if job['failures'] >= 2:
update['$set'] = {
'status': 'FAILED'
}
self.db.queue.update(
{'_id': job['_id']},
update)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment