Last active
December 22, 2015 23:49
-
-
Save techiev2/6549606 to your computer and use it in GitHub Desktop.
Mongoengine Query/Object creation wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Object utils""" | |
# coding=utf-8 | |
__author__ = 'Sriram Velamur' | |
import sys | |
sys.dont_write_bytecode = True | |
from mongoengine import (ValidationError, NotUniqueError, | |
LookUpError, InvalidQueryError) | |
from re import findall | |
import logging | |
class QueryObject(object): | |
""" | |
Query object wrapper. Wraps around a mongoengine query and provides | |
convenience wrapper methods for API reponses/exceptions. | |
:Todo: Normalize class to be usable as an ORM agnostic wrapper. | |
""" | |
def __init__(self, controller=None, model=None, query=None, meta=None): | |
""" | |
Object creation wrapper init | |
:param model:str Model to create an object instance of. Required | |
:param query: Query data for object instance. Optional | |
:param meta:dict Meta data for filtering query. Optional | |
""" | |
if not model: | |
raise Exception("No model provided to query object") | |
self.result, self.exception, self.model = None, None, None | |
self.query = query or {} | |
self.meta = meta or {} | |
self.controller = controller | |
try: | |
if controller and hasattr(controller, 'settings'): | |
apps = controller.settings.get('APPS', []) | |
for app in apps: | |
if 'models' in __import__(app).__dict__.keys(): | |
app = __import__("%s.models" % app) | |
app_model = getattr(app, 'models') | |
if app_model: | |
self.model_name = model | |
self.model = \ | |
app_model.__dict__.get(self.model_name) | |
else: | |
self.models = __import__('models') | |
self.model_name = model | |
self.model = getattr(self.models, model) | |
self.result = self.model.objects.filter( | |
**self.query) if self.model else [] | |
self.result = self.result.order_by('obj_id') if self.model._fields.get('obj_id') else self.result | |
for (key, val) in self.meta.iteritems(): | |
if hasattr(self.result, key) and \ | |
hasattr(getattr(self.result, key), '__call__'): | |
self.result = getattr(self.result, key).__call__(val) | |
if len(self.result) == 0: | |
self.result = None | |
self.exception = { | |
'status_code': 404 | |
} | |
except ImportError, ie: | |
logging.log(9001, "Import error: %s" % ie.message) | |
self.result = None | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': "Unable to import specified model" | |
} | |
except LookupError, le: | |
logging.log(9001, "Lookup error in model: %s" % le.message) | |
self.result = None | |
self.exception = { | |
'status_code': 400, | |
'custom_msg': le.message | |
} | |
except Exception, b_e: | |
logging.log(9001, "Base Exception: %s" % b_e.message) | |
self.result = None | |
if 'duplicate' in b_e.message: | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': 'Duplicate entity' | |
} | |
elif 'Cannot resolve' in b_e.message: | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': 'Invalid query' | |
} | |
else: | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': str(b_e.message) | |
} | |
if reduce(lambda x, y: x and y, [ | |
hasattr(self.controller, x) for x in ['ui', 'request']]): | |
setattr(controller, 'response', self.json()) | |
# if self.controller: | |
# setattr(self.controller, 'response', | |
# self.exception if self.exception else self.json()) | |
def json(self, fields=None, exclude=None, as_string=False): | |
"""JSON dumper wrapper""" | |
req_args = getattr(self.controller, 'req_args', {}) | |
fields = req_args.get('fields', []) or fields | |
exclude = req_args.get('exclude', []) or exclude | |
start = req_args.get('start', None) | |
end = req_args.get('end', None) | |
try: | |
start = int(start[0]) if start and len(start) == 1 and\ | |
int(end[0]) < self.count else 0 | |
except ValueError: | |
start = 0 | |
try: | |
end = int(end[0]) if end and len(end) == 1 and \ | |
int(end[0]) < self.count else self.count | |
except ValueError: | |
end = self.count | |
result = self.result[start:end] if self.result else self.result | |
return {x:y.json(fields, exclude, as_string) for x, y in | |
enumerate(result) if hasattr(y, 'json')} \ | |
if self.count > 1 else result[0].json(fields, | |
exclude, as_string) if self.count == 1 else \ | |
self.exception if self.exception else [] | |
@property | |
def count(self): | |
"""Object instance counter""" | |
count = 0 | |
if self.result: | |
count = self.result.count() | |
return count | |
def delete(self, **kwargs): | |
"""Object instance delete wrapper""" | |
if self.result and hasattr(self.result, 'delete') \ | |
and hasattr(self.result.delete, '__call__'): | |
try: | |
if self.controller: | |
user = getattr(self.controller, 'user') | |
# if not user: | |
# return { | |
# 'status_code': 401 | |
# } | |
self.result.delete() | |
return { | |
'status_code': 204 | |
} | |
else: | |
self.result.delete() | |
return { | |
'status_code': 204 | |
} | |
except Exception, e: | |
return { | |
'status_code': 202, | |
'message': '{0} object deletion failed'.format( | |
self.model_name) | |
} | |
else: | |
return { | |
'status_code': 404 | |
} | |
def update(self, update_data): | |
"""Update object instance wrapper for QueryObject""" | |
if update_data and self.result: | |
# if not getattr(self.controller, 'user'): | |
# self.response = {'status_code': 401} | |
# return | |
try: | |
query_data = update_data | |
update_kwargs = {'set__%s' % key: val for \ | |
key, val in update_data.iteritems()} | |
self.result.update(**update_kwargs) | |
self.result = QueryObject(self.controller, self.model_name, | |
query=query_data).result | |
except LookUpError, l_e: | |
self.result = None | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': l_e.message | |
} | |
except InvalidQueryError, iq_e: | |
self.result = None | |
self.exception = { | |
'status_code': 422, | |
'custom_msg': iq_e.message[0].replace( | |
"Cannot resolve field", | |
"Invalid field").replace('"', '') | |
} | |
except Exception, exc: | |
self.result = None | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': exc.message | |
} | |
if reduce(lambda x, y: x and y, [ | |
hasattr(self.controller, x) for x in ['ui', 'request']]): | |
setattr(self.controller, 'response', self.json()) | |
class CreateObject(object): | |
def __init__(self, controller=None, model=None, data=None): | |
""" | |
Object creation wrapper init | |
:param model:str Model to create an object instance of | |
:param data: Data for object instance creation | |
""" | |
self.object, self.exception = None, None | |
if not model: | |
raise Exception("No model provided for object creation") | |
if not data: | |
raise Exception("No data provided for object creation") | |
try: | |
if controller and hasattr(controller, 'settings'): | |
apps = controller.settings.get('APPS', []) | |
for app in apps: | |
if 'models' in __import__(app).__dict__.keys(): | |
app = __import__("%s.models" % app) | |
app_model = getattr(app, 'models') | |
if app_model: | |
self.model_name = model | |
self.model= app_model.__dict__.get(self.model_name) | |
else: | |
self.models = __import__('models') | |
self.model_name = model | |
self.model = getattr(self.models, model) | |
self.data = data | |
self.object = self.model(**self.data) | |
self.object.save() | |
except ImportError, ie: | |
raise Exception("Unable to import specified models pack") | |
except ValidationError, v_e: | |
self.object = None | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': '' | |
} | |
message = v_e.message | |
self.exception['custom_msg'] = message.split(') (')[1].rstrip(')') | |
except NotUniqueError, nu_err: | |
dup_msg = 'Duplicate value for {0} field' | |
field = findall(r'.*?\$(\w+)\_.*?', nu_err.message) | |
field = field[0] if field else None | |
self.object = None | |
self.exception = { | |
'status_code': 500, | |
'custom_msg': '' | |
} | |
if field: | |
self.exception['custom_msg'] = dup_msg.format(field) | |
if controller: | |
setattr(controller, 'response', | |
self.exception if self.exception else self.json()) | |
def json(self): | |
return self.object.json() if hasattr(self.object, 'json') else\ | |
self.exception if self.exception else {'status_code': 500} | |
__all__ = ['QueryObject', 'CreateObject'] | |
if __name__ == '__main__': | |
pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment