Skip to content

Instantly share code, notes, and snippets.

@techiev2
Last active December 22, 2015 23:49
Show Gist options
  • Save techiev2/6549606 to your computer and use it in GitHub Desktop.
Save techiev2/6549606 to your computer and use it in GitHub Desktop.
Mongoengine Query/Object creation wrapper
#!/usr/bin/env python
"""Object utils"""
# coding=utf-8
__author__ = 'Sriram Velamur'
import sys
sys.dont_write_bytecode = True
from mongoengine import (ValidationError, NotUniqueError,
LookUpError, InvalidQueryError)
from re import findall
import logging
class QueryObject(object):
"""
Query object wrapper. Wraps around a mongoengine query and provides
convenience wrapper methods for API reponses/exceptions.
:Todo: Normalize class to be usable as an ORM agnostic wrapper.
"""
def __init__(self, controller=None, model=None, query=None, meta=None):
"""
Object creation wrapper init
:param model:str Model to create an object instance of. Required
:param query: Query data for object instance. Optional
:param meta:dict Meta data for filtering query. Optional
"""
if not model:
raise Exception("No model provided to query object")
self.result, self.exception, self.model = None, None, None
self.query = query or {}
self.meta = meta or {}
self.controller = controller
try:
if controller and hasattr(controller, 'settings'):
apps = controller.settings.get('APPS', [])
for app in apps:
if 'models' in __import__(app).__dict__.keys():
app = __import__("%s.models" % app)
app_model = getattr(app, 'models')
if app_model:
self.model_name = model
self.model = \
app_model.__dict__.get(self.model_name)
else:
self.models = __import__('models')
self.model_name = model
self.model = getattr(self.models, model)
self.result = self.model.objects.filter(
**self.query) if self.model else []
self.result = self.result.order_by('obj_id') if self.model._fields.get('obj_id') else self.result
for (key, val) in self.meta.iteritems():
if hasattr(self.result, key) and \
hasattr(getattr(self.result, key), '__call__'):
self.result = getattr(self.result, key).__call__(val)
if len(self.result) == 0:
self.result = None
self.exception = {
'status_code': 404
}
except ImportError, ie:
logging.log(9001, "Import error: %s" % ie.message)
self.result = None
self.exception = {
'status_code': 500,
'custom_msg': "Unable to import specified model"
}
except LookupError, le:
logging.log(9001, "Lookup error in model: %s" % le.message)
self.result = None
self.exception = {
'status_code': 400,
'custom_msg': le.message
}
except Exception, b_e:
logging.log(9001, "Base Exception: %s" % b_e.message)
self.result = None
if 'duplicate' in b_e.message:
self.exception = {
'status_code': 500,
'custom_msg': 'Duplicate entity'
}
elif 'Cannot resolve' in b_e.message:
self.exception = {
'status_code': 500,
'custom_msg': 'Invalid query'
}
else:
self.exception = {
'status_code': 500,
'custom_msg': str(b_e.message)
}
if reduce(lambda x, y: x and y, [
hasattr(self.controller, x) for x in ['ui', 'request']]):
setattr(controller, 'response', self.json())
# if self.controller:
# setattr(self.controller, 'response',
# self.exception if self.exception else self.json())
def json(self, fields=None, exclude=None, as_string=False):
"""JSON dumper wrapper"""
req_args = getattr(self.controller, 'req_args', {})
fields = req_args.get('fields', []) or fields
exclude = req_args.get('exclude', []) or exclude
start = req_args.get('start', None)
end = req_args.get('end', None)
try:
start = int(start[0]) if start and len(start) == 1 and\
int(end[0]) < self.count else 0
except ValueError:
start = 0
try:
end = int(end[0]) if end and len(end) == 1 and \
int(end[0]) < self.count else self.count
except ValueError:
end = self.count
result = self.result[start:end] if self.result else self.result
return {x:y.json(fields, exclude, as_string) for x, y in
enumerate(result) if hasattr(y, 'json')} \
if self.count > 1 else result[0].json(fields,
exclude, as_string) if self.count == 1 else \
self.exception if self.exception else []
@property
def count(self):
"""Object instance counter"""
count = 0
if self.result:
count = self.result.count()
return count
def delete(self, **kwargs):
"""Object instance delete wrapper"""
if self.result and hasattr(self.result, 'delete') \
and hasattr(self.result.delete, '__call__'):
try:
if self.controller:
user = getattr(self.controller, 'user')
# if not user:
# return {
# 'status_code': 401
# }
self.result.delete()
return {
'status_code': 204
}
else:
self.result.delete()
return {
'status_code': 204
}
except Exception, e:
return {
'status_code': 202,
'message': '{0} object deletion failed'.format(
self.model_name)
}
else:
return {
'status_code': 404
}
def update(self, update_data):
"""Update object instance wrapper for QueryObject"""
if update_data and self.result:
# if not getattr(self.controller, 'user'):
# self.response = {'status_code': 401}
# return
try:
query_data = update_data
update_kwargs = {'set__%s' % key: val for \
key, val in update_data.iteritems()}
self.result.update(**update_kwargs)
self.result = QueryObject(self.controller, self.model_name,
query=query_data).result
except LookUpError, l_e:
self.result = None
self.exception = {
'status_code': 500,
'custom_msg': l_e.message
}
except InvalidQueryError, iq_e:
self.result = None
self.exception = {
'status_code': 422,
'custom_msg': iq_e.message[0].replace(
"Cannot resolve field",
"Invalid field").replace('"', '')
}
except Exception, exc:
self.result = None
self.exception = {
'status_code': 500,
'custom_msg': exc.message
}
if reduce(lambda x, y: x and y, [
hasattr(self.controller, x) for x in ['ui', 'request']]):
setattr(self.controller, 'response', self.json())
class CreateObject(object):
def __init__(self, controller=None, model=None, data=None):
"""
Object creation wrapper init
:param model:str Model to create an object instance of
:param data: Data for object instance creation
"""
self.object, self.exception = None, None
if not model:
raise Exception("No model provided for object creation")
if not data:
raise Exception("No data provided for object creation")
try:
if controller and hasattr(controller, 'settings'):
apps = controller.settings.get('APPS', [])
for app in apps:
if 'models' in __import__(app).__dict__.keys():
app = __import__("%s.models" % app)
app_model = getattr(app, 'models')
if app_model:
self.model_name = model
self.model= app_model.__dict__.get(self.model_name)
else:
self.models = __import__('models')
self.model_name = model
self.model = getattr(self.models, model)
self.data = data
self.object = self.model(**self.data)
self.object.save()
except ImportError, ie:
raise Exception("Unable to import specified models pack")
except ValidationError, v_e:
self.object = None
self.exception = {
'status_code': 500,
'custom_msg': ''
}
message = v_e.message
self.exception['custom_msg'] = message.split(') (')[1].rstrip(')')
except NotUniqueError, nu_err:
dup_msg = 'Duplicate value for {0} field'
field = findall(r'.*?\$(\w+)\_.*?', nu_err.message)
field = field[0] if field else None
self.object = None
self.exception = {
'status_code': 500,
'custom_msg': ''
}
if field:
self.exception['custom_msg'] = dup_msg.format(field)
if controller:
setattr(controller, 'response',
self.exception if self.exception else self.json())
def json(self):
return self.object.json() if hasattr(self.object, 'json') else\
self.exception if self.exception else {'status_code': 500}
__all__ = ['QueryObject', 'CreateObject']
if __name__ == '__main__':
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment