Skip to content

Instantly share code, notes, and snippets.

@droot
Last active December 17, 2015 00:10
Show Gist options
  • Save droot/5519223 to your computer and use it in GitHub Desktop.
Save droot/5519223 to your computer and use it in GitHub Desktop.
This gist implements an Image Uploader Service in python using Google App Engine. There are four components. image.py implements the Image Model to persist Image Resource. image_service.py implements image service client which provides basis functionality of retrieving, saving, updating and deleting an image resource. image_rest_handler.py imple…
import datetime
from google.appengine.ext import db
class Image(db.Model):
"""Model to represent Image
"""
name = db.StringProperty(required = True)
data = db.TextProperty(required = True)
created_at = db.DateTimeProperty(auto_now_add = True)
updated_at = db.DateTimeProperty(auto_now = False)
def to_dict(self):
return {
'name': self.name,
'src': self.data,
'id': str(self.key()),
}
import json
from google.appengine.ext import db
from datetime import datetime
from base import BaseHandler
import logging
from models.image import Image
import image_service #image service layer
log = logging.getLogger(__name__)
class ImageRestHandler(BaseHandler):
def get(self, id = None):
log.info("Image Get request received for %s" % id)
if id:
try:
img = image_service.get(id)
if img:
resp = img.to_dict()
else:
self.abort(404)
except db.BadKeyError:
self.abort(404)
else:
start = int(self.request.get('start', "0"))
count = int(self.request.get('count', 16))
images = image_service.multi_get(limit = count, offset = start)
resp = []
for x in images: resp.append(x.to_dict())
return self.render_json(resp)
def post(self, id = None):
"""Save a new image
"""
log.info('Image Post request received: %s' % (self.request.body))
resp = {}
try:
input_img = json.loads(self.request.body)
if input_img.get('name', None) is None or \
input_img.get('src', None) is None:
#Bad Request missing args
self.abort(400)
except json.ValueError:
self.abort(400) #bad request
img = image_service.create(name = input_img.get('name'), data = input_img.get('src'))
resp = img.to_dict()
return self.render_json(resp)
def put(self, id = None):
"""Update a Tracker Information
"""
log.info("Image update request received for %s" % id)
if id is None:
self.abort(400) #bad request
try:
input_img = json.loads(self.request.body)
except json.ValueError:
self.abort(400) #bad request
resp = {}
img = image_service.update(id, **input_img)
if img is None:
self.abort(400)
else:
resp = img.to_dict()
return self.render_json(resp)
def delete(self, id = None):
"""Delete an Image resource
"""
log.info("Image Delete request received for %s" % id)
if id is None:
self.abort(400)
img = image_service.delete(id)
if img:
return self.render_json(img.to_dict())
else:
self.abort(400)
"""This file implement image service module. This service is responsible for providing the
service abstraction to higher level module like Image REST Controller
"""
from models.image import Image
DEFAULT_PAGE_SIZE = 10
def get(id):
return Image.get(id)
def multi_get(offset = 0, limit = DEFAULT_PAGE_SIZE):
q = Image.all()
return q.order('-created_at').run(limit = limit, offset = offset)
def create(**kwargs):
img = Image(**kwargs)
img.put()
return img
def update(id, **kwargs):
img = get(id)
if not img:
return None
if kwargs.get('name', None):
img.name = kwargs.get('name')
if kwargs.get('src', None):
img.data = kwargs.get('src')
img.put()
return img
def delete(id):
img = Image.get(id)
if img:
img.delete()
return img
else:
return None
#!/usr/bin/env python
"""Quick script to test the image REST service
"""
import requests
import json
URL = 'http://localhost:8080/images'
IMAGE_CREATE_DATA = { 'name': 'Test Image', 'src': 'data 1'}
IMAGE_UPDATE_DATE = {'name': 'Test Image2', 'src': 'data 2'}
def test_post():
r = requests.post(URL, data = json.dumps(IMAGE_CREATE_DATA))
assert r.status_code == 200
resp = json.loads(r.content)
print "Image POST OK"
assert resp.get('name') == IMAGE_CREATE_DATA.get('name') and \
resp.get('src') == IMAGE_CREATE_DATA.get('src')
return resp
def test_get(image_data):
r = requests.get(URL + "/%s" % (image_data.get('id')))
assert r.status_code == 200
resp = json.loads(r.content)
assert resp.get('id') == image_data.get('id')
print "Image GET OK"
return resp
def test_put(image_data):
r = requests.put(URL + "/%s" % (image_data.get('id')), data = json.dumps(IMAGE_UPDATE_DATE))
assert r.status_code == 200
resp = json.loads(r.content)
assert resp.get('id') == image_data.get('id') and \
resp.get('name') == IMAGE_UPDATE_DATE.get('name') and \
resp.get('src') == IMAGE_UPDATE_DATE.get('src')
print "Image PUT OK"
return image_data
def test_delete(image_data):
r = requests.delete(URL + "/%s" % (image_data.get('id')))
assert r.status_code == 200
resp = json.loads(r.content)
assert resp.get('id') == image_data.get('id')
print "Image DELETE OK"
if __name__ == '__main__':
import sys
if len(sys.argv) == 2:
URL = sys.argv[1]
image = test_post()
image = test_get(image)
image = test_put(image)
test_delete(image)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment