Skip to content

Instantly share code, notes, and snippets.

@SupermanScott
Created May 14, 2012 01:58
Show Gist options
  • Save SupermanScott/2691263 to your computer and use it in GitHub Desktop.
Save SupermanScott/2691263 to your computer and use it in GitHub Desktop.
My attempt to do idempotent Mongo writes with MongoKit
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Scott Reynolds
# All rights reserved.
from mongokit import Document, UpdateQueryError
from pymongo.errors import OperationFailure
from bson import BSON
import hashlib
class IdempotentDocument(Document):
"""
Document that is able to ensure that any writes are idempotent and when the
document write is not idempotent, throws an Exception.
"""
structure = {
'_versioned_id': unicode,
}
def save(self, validate=None, safe=True, *args, **kwargs):
"""
save the document into the db.
"""
old_version = ''
if '_versioned_id' in self and self['_versioned_id']:
old_version = self['_versioned_id']
# Find the properties of the document that will be used for the hash of
# the document and the properties to save to the document.
hashable_properties = {}
updates = {}
for key, value in self.iteritems():
# don't update the _versioned_id. it will be set later on below with
# the new _versioned_id. This pattern allows this property to be
# altered yet! be saved properly as the sha1 of the document.
if key != '_id' and key != '_versioned_id':
updates[key] = value
hashable_properties[key] = value
# Version id is the sha1 of the hashable contents.
# @TODO: would be nice to make hashable properties be definable on the
# model.
new_version = unicode(hashlib.sha1(BSON.encode(hashable_properties)).hexdigest())
self['_versioned_id'] = updates['_versioned_id'] = new_version
if validate is True or (validate is None and self.skip_validation is False):
self.validate(auto_migrate=False)
else:
if self.use_autorefs:
self._make_reference(self, self.structure)
# Basically, if this is an update AND the old one doesn't have the same
# sha1 as the new one.
if old_version and old_version != new_version:
self['_versioned_id'] = new_version
self._process_custom_type('bson', self, self.structure)
new_data = self.collection.find_and_modify({'_versioned_id': old_version},
{'$set': updates}, new=True)
self._process_custom_type('python', self, self.structure)
if not new_data:
raise UpdateQueryError("Document has changed since it was last loaded")
# This means it is a new document not loaded from the database. Try to
# save it and if it fails, that means someone beat us too it.
elif not old_version:
self['_id'] = new_version
try:
self.collection.find_and_modify({'_id': self['_id'],
'_versioned_id': new_version},
{'$set': updates},
new=True, upsert=True)
except OperationFailure:
raise UpdateQueryError("Document has already been created")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment