Skip to content

Instantly share code, notes, and snippets.

@droot
Created June 23, 2013 00:38
Show Gist options
  • Save droot/5843252 to your computer and use it in GitHub Desktop.
Save droot/5843252 to your computer and use it in GitHub Desktop.
Gist presents quick code a REST handler for /emails service: POST /emails will be a webhook to consume request from an email service when a new email is received GET /emails -- returns email contents Get /emails/idxxxxx -- returns the info about an email content reference by id idxxxxxx
import json
from google.appengine.ext import db
from datetime import datetime
from base import BaseHandler
import logging
from models.emails import EmailContent
log = logging.getLogger(__name__)
class EmailContentRestHandler(BaseHandler):
def get(self, id = None):
log.info("EmailContent Get request received for %s" % id)
if id:
try:
email_content = EmailContent.get(id)
if email_content:
resp = email_content.to_dict()
else:
self.abort(404)
except db.BadKeyError:
self.abort(404)
else:
q = EmailContent.all()
rules = q.order('-created_at').run(limit = 10, offset = 0)
resp = []
for x in rules: resp.append(x.to_dict())
return self.render_json(resp)
def _validate_request(self, input_val):
"""Quick helper to validate incoming post request"""
try:
content_map = json.loads(input_val)
if content_map.get('from_address', None) is None or \
content_map.get('plain_message_reply_only', None) is None or \
content_map.get('subject', None) is None:
return None
except ValueError:
#we will be here because of invalid json
return None
return content_map
def post(self, id = None):
"""Save a new image
"""
input_val = '%s' % (self.request.body)
log.info('Webhook Post request received: %s' % (input_val))
resp = {}
content_map = self._validate_request(input_val)
if content_map:
email_content = EmailContent(from_address = content_map.get('from_address'),
subject = content_map.get('subject'),
text_body = content_map.get('plain_message_reply_only'))
email_content.save()
resp = email_content.to_dict()
return self.render_json(resp)
else:
self.abort(403) #invalid request
import datetime
from google.appengine.ext import db
class EmailContent(db.Model):
from_address = db.StringProperty(required = True)
subject = db.StringProperty(required = True)
text_body = db.TextProperty(required = True)
created_at = db.DateTimeProperty(auto_now_add = True)
def to_dict(self):
return {
'id': str(self.key()),
'from_address': str(self.from_address),
'subject': str(self.subject)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment