Created
May 24, 2013 14:33
-
-
Save marians/5643933 to your computer and use it in GitHub Desktop.
Untested version of a job queue that relies on MongoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Untested version of some job queue | |
Usage: | |
from pymongo import MongoClient | |
db = MongoClient() | |
queue = Queue("myqueue", db) | |
job = { | |
'key': 'foobar', | |
'payload': 'Some data needed for the job' | |
} | |
queue.add(job, job['key']) | |
hasjob = True | |
while hasjob: | |
thisjob = queue.get() | |
if thisjob is not None: | |
queue.resolve_job(job['key']) | |
else: | |
hasjob = False | |
""" | |
from pymongo.errors import DuplicateKeyError | |
from datetime import datetime | |
class Queue(object): | |
def __init__(self, name, db): | |
self.name = name | |
self.config = config | |
self.db = db.db | |
# implies that you MongoDB collection should be called "queue" | |
self.db.queue.ensure_index([('qname', 1), ('key', 1)], unique=True) | |
def add(self, key, payload=None): | |
""" | |
Add a job to the queue. If a job with the given key already exists, | |
no error occurs, but the job isn't added. | |
""" | |
job = { | |
'qname': self.name, | |
'status': 'OPEN', | |
'key': key, | |
'failures': 0, | |
'last_modified': datetime.utcnow() | |
} | |
if payload is not None: | |
job['payload'] = payload | |
try: | |
self.db.queue.save(job) | |
except DuplicateKeyError: | |
pass | |
def get(self): | |
""" | |
Return a job and mark it as "IN_PROGRESS" at the same time. | |
""" | |
query = { | |
'qname': self.name, | |
'status': 'OPEN' | |
} | |
update = { | |
'$set': { | |
'status': 'IN_PROGRESS', | |
'last_modified': datetime.utcnow() | |
} | |
} | |
find = self.db.queue.find_and_modify(query=query, | |
update=update) | |
out = {'key': find['key']} | |
if 'payload' in find: | |
out['payload'] = find['payload'] | |
return out | |
def __len__(self): | |
""" | |
Returns the number of OPEN jobs | |
""" | |
query = { | |
'qname': self.name, | |
'status': 'OPEN' | |
} | |
num = self.db.queue.find(query).count() | |
return num | |
def resolve_job(self, key): | |
""" | |
Mark a job as "DONE". | |
""" | |
query = { | |
'qname': self.name, | |
'key': key | |
} | |
update = { | |
'$set': { | |
'status': 'DONE', | |
'last_modified': datetime.utcnow() | |
} | |
} | |
self.db.queue.find_and_modify(query=query, | |
update=update) | |
def mark_failed(self, key_or_element): | |
""" | |
Add 1 to the failure count of a job. | |
If the failure count reaches 3, set the job status | |
to "FAILED". | |
""" | |
key = None | |
if type(key_or_element) == dict: | |
key = key_or_element['key'] | |
else: | |
key = key_or_element | |
query = { | |
'qname': self.name, | |
'key': key | |
} | |
job = self.db.queue.find_one(query) | |
update = { | |
'$inc': { | |
'failures': 1 | |
} | |
} | |
if job['failures'] >= 2: | |
update['$set'] = { | |
'status': 'FAILED' | |
} | |
self.db.queue.update( | |
{'_id': job['_id']}, | |
update) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment