Skip to content

Instantly share code, notes, and snippets.

@yegle
Created November 2, 2012 03:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yegle/3998624 to your computer and use it in GitHub Desktop.
Save yegle/3998624 to your computer and use it in GitHub Desktop.
REST API based on Django ORM and web.py
class API(app.page):
pass
class DjangoAPI(API):
model = None
def __init__(self):
self.qs = self.model.objects
self.exclude = []
#self.includes = [field.name for field in self.qs[0]]
try:
self.page_num = int(web.input().get('page', 1))
except ValueError:
self.page_num = 1
try:
self.count = int(web.input().get('count', 10))
except ValueError:
self.count = 10
self.end = self.page_num * self.count
self.start = (self.page_num - 1) * self.count
self.params = dict(web.input().items())
try:
self.batch_mode = bool(web.input().get('_batch', False))
except ValueError:
self.batch_mode = False
self.order_by = web.input().get('_order_by',None)
if self.order_by:
self.qs = self.qs.order_by(*self.order_by.split(','))
del self.params['_order_by']
try:
self.upsert_mode = bool(web.input().get('_upsert', False))
except ValueError:
self.upsert_mode = False
for key in self.params:
if key.endswith('__in'):
self.params[key] = self.params[key].split(',')
try:
self.get_all = bool(web.input().get('_all', False))
except ValueError:
self.get_all = False
# delete internal parameters
for key in ['page', 'count', '_batch', '_upsert', '_all']:
try:
del self.params[key]
except KeyError:
pass
self.log()
def insert(self, data):
try:
self.qs.create(**data)
return 'create'
except Exception as e:
return e
def log(self):
caller = web.ctx.env.get('HTTP_X_API_CALLER', '')
if caller == 'CFEngine':
return
data = dict()
data['method'] = web.ctx.method
data['caller'] = caller
data['ip'] = web.ctx.ip
data['resource'] = self.model.__name__
APILog(**data).save()
def upsert(self, data):
try:
a = self.qs.filter(**data)
if len(a) == 0:
self.qs.create(**data)
return 'create'
else:
a.update(**data)
for obj in a:
obj.save()
return 'update'
except Exception as e:
return e
def convert_to_dict(self, d):
fields = [field.name for field in self.qs.model._meta.fields]
try:
ret = list()
for obj in d:
ret.append(model_to_dict(obj, fields=fields))
except TypeError:
ret = model_to_dict(d, fields=fields)
return ret
#data = serializers.serialize('python', d)
#ret = list()
#for obj in data:
# ret.append(obj['fields'])
#return ret
@mimerender(default='json', json=render.json, csv=render.csv)
def GET(self, pk=None):
if pk:
try:
a = self.qs.get(pk=pk)
except ObjectDoesNotExist as e:
return {
'data': web.notfound(
'The item you are requesting does not exists.'
)
}
else:
if self.get_all:
a = self.qs.filter(**self.params)
else:
a = self.qs.filter(**self.params)[self.start:self.end]
data = self.convert_to_dict(a)
return {'data': data}
@mimerender(default='json', json=render.json, csv=render.csv,)
def POST(self, pk=None):
assert pk is None
if self.batch_mode:
assert 'data' in self.params
data = json.loads(self.params['data'])
try:
def batches(iterable, batch_size):
import itertools
'''
Returns the given iterable split into batches,
of size batch_size.'''
iterable = iter(iterable)
counter = itertools.count()
def ticker(key):
return next(counter) // batch_size
for key, group in itertools.groupby(iterable, ticker):
yield group
result = [self.model(**params) for params in data]
for batch in batches(result, 1000):
self.qs.bulk_create(batch)
return {
'data': {
'msg': '%s inserted' % (len(data),)
}
}
except Exception as e:
return {'data': web.internalerror(str(e))}
else:
if self.upsert_mode:
ret = self.upsert(self.params)
else:
ret = self.insert(self.params)
if ret == 'update' or ret == 'create':
message = 'successfully %sd' % (ret,)
return {'data': {'msg': message},
'header': None}
else:
return {'data': web.internalerror(repr(ret))}
@mimerender(default='json', json=render.json, csv=render.csv,)
def PUT(self, pk=None):
assert pk is not None
try:
self.qs.filter(pk=pk).update(**self.params)
a = self.qs.get(pk=pk)
data = self.convert_to_dict(a)
return {'data': data}
except Exception as e:
return {'data': web.internalerror(str(e))}
@mimerender(default='json', json=render.json, csv=render.csv,)
def DELETE(self, pk=None):
try:
if pk:
self.qs.filter(pk=pk).delete()
return {'data': {'msg': 'successfully deleted'},
'header': None}
else:
self.qs.filter(**self.params).delete()
return {'data': {'msg': 'successfully deleted'},
'header': None}
except Exception as e:
return {'data': web.internalerror(repr(e))}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment