Last active
December 24, 2015 04:38
-
-
Save alekseyev/6744818 to your computer and use it in GitHub Desktop.
Simple Mongo ODM
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from somewhere import get_mongo_db | |
# assuming get_mongo_db returns db instance | |
class MongoException(Exception): | |
pass | |
class MongoField(object): | |
pass | |
class MongoMeta(type): | |
def __new__(cls, name, bases, attrs): | |
super_new = super(MongoMeta, cls).__new__ | |
# let's create empty class: | |
module = attrs.pop('__module__') | |
new_class = super_new(cls, name, bases, {'__module__': module}) | |
new_class.mongo_fields = [] | |
# attributes from base classes need to be processed separately | |
for base in bases: | |
base_fields = getattr(base, 'mongo_fields', []) | |
new_class.mongo_fields += base_fields | |
# process own field attributes | |
for attr, value in attrs.items(): | |
if isinstance(value, MongoField): | |
setattr(new_class, attr, None) | |
new_class.mongo_fields.append(attr) | |
else: | |
setattr(new_class, attr, value) | |
return new_class | |
class MongoModel(object, metaclass=MongoMeta): # for Python 3 | |
""" | |
Basic class for Mongo models. Add fields as MongoField attributes: | |
class Person(MongoModel): | |
collection_name = "people" | |
name = MongoField() | |
birthdate = MongoField() | |
""" | |
__metaclass__ = MongoMeta # for Python 2 | |
_id = MongoField() | |
collection_name = None | |
mongo_collection = None | |
def __init__(self, *args, **kwargs): | |
if self.collection_name: | |
self.mongo_collection = get_mongo_db()[self.collection_name] | |
for f in self.mongo_fields: | |
setattr(self, f, kwargs.get(f)) | |
@classmethod | |
def get(cls, _id, default={}): | |
if not cls.collection_name: | |
raise MongoException('Collection is not defined') | |
mongo_collection = get_mongo_db()[cls.collection_name] | |
mongo_data = mongo_collection.find_one(ObjectId(_id)) or default | |
return cls(**mongo_data) | |
def delete(self): | |
if not self.mongo_collection: | |
raise MongoException('Collection is not defined') | |
if self._id: | |
self.mongo_collection.remove(self._id) | |
else: | |
raise MongoException("ID is not defined") | |
def save(self): | |
if not self.mongo_collection: | |
raise MongoException('Collection is not defined') | |
mongo_data = {f: getattr(self, f) for f in self.mongo_fields} | |
if not self._id: | |
mongo_data.pop('_id') | |
elif not isinstance(self._id, ObjectId): # needed if we use integers as _id | |
mongo_data['_id'] = unicode(self._id) | |
self.mongo_collection.save(mongo_data) | |
self._id = mongo_data.get('_id') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment