Skip to content

Instantly share code, notes, and snippets.

@ericmoritz
Created March 23, 2011 19:51
Show Gist options
  • Save ericmoritz/883817 to your computer and use it in GitHub Desktop.
Save ericmoritz/883817 to your computer and use it in GitHub Desktop.
My nano-framework
"""A simple little hello/goodbye example which takes advantage of urlvars"""
from nano import FrontController
import routes
from routes.middleware import RoutesMiddleware
from webob.dec import wsgify
from webob.exc import HTTPNotFound
from webob import Response
resources = {}
@wsgify
def hello(request):
return Response("Hello, World", content_type="text/plain")
resources['hello'] = hello
@wsgify
def goodbye(request):
name = request.urlvars['name']
return Response("Goodbye %s" % name, content_type="text/plain")
resources['goodbye'] = goodbye
mapper = routes.Mapper()
mapper.connect("/", resource="hello")
mapper.connect("/goodbye/{name}", resource="goodbye")
front = FrontController(resources)
application = RoutesMiddleware(front, mapper)
if __name__ == '__main__':
from wsgiref.simple_server import make_server
print "Listening on port 8081"
server = make_server("", 8081, application)
server.serve_forever()
from webob.dec import wsgify
from webob import exc
import simplejson as json
from json_schema_validator.errors import ValidationError
from json_schema_validator.schema import Schema
from json_schema_validator.validator import Validator
content_types = {
"json": "application/json",
"html": "text/html"
}
decoders = {
"application/json": json.loads
}
encoders = {
"application/json": json.dumps
}
def agent_accepts(request, offers):
# If the format is given, always return
content_type = None
if "format" in request.urlvars:
format = request.urlvars.get("format")
content_type = content_types[format]
if content_type not in offers:
content_type = None
else:
content_type = request.accept.best_match(offers)
if content_type is None:
raise exc.HTTPNotAcceptable("Offered: %s" % ("; ".join(offers), ))
else:
return content_type
def allowed(request, methods):
if request.method not in methods:
raise exc.HTTPMethodNotAllowed(allow=methods)
def encode_body(content_type, data, encoders=encoders):
try:
encoder = encoders[content_type]
except KeyError:
raise ValueError("Unknown content-type: %s" % (content_type, ))
return encoder(data)
def require_user(request):
if request.remote_user is None:
raise exc.HTTPUnauthorized()
def decode_body(request, json_schema=None, decoders=decoders):
offers = decoders.keys()
content_type = request.content_type
if content_type not in offers:
raise exc.HTTPUnsupportedMediaType(u"%s is not supported" % content_type)
decoder = decoders[content_type]
try:
data = decoder(request.body)
except Exception, error:
raise exc.HTTPBadRequest(unicode(error))
if json_schema:
try:
valid = Validator.validate(Schema(json_schema), data)
except ValidationError, error:
error_message = u"""%s
Schema:
%s""" % (unicode(error), json.dumps(json_schema).decode("ascii"))
raise exc.HTTPBadRequest(error_message)
return data
class FrontController(object):
def __init__(self, resources):
self.resources = resources
@wsgify
def __call__(self, request):
match = request.urlvars
if match:
inner_app = self.resources[match['resource']]
return request.get_response(inner_app)
else:
raise exc.HTTPNotFound()
Routes==1.12.3
WebOb==1.1beta1
json-schema-validator==2.1
"""This is an example of nesting WSGI apps"""
from nano import FrontController
from routes.middleware import RoutesMiddleware
from greetings import application as greetings_app
import routes
front = FrontController({'greetings': greetings_app})
mapper = routes.Mapper()
mapper.connect("/{version}/greetings/{path_info:.*}", resource="greetings")
application = RoutesMiddleware(front, mapper)
if __name__ == '__main__':
from wsgiref.simple_server import make_server
print "Listening on port 8081"
server = make_server("", 8081, application)
server.serve_forever()
import unittest
from webob import Request
from webob import exc
import nano
import simplejson as json
class AcceptsTest(unittest.TestCase):
def test_accepted(self):
request = Request.blank("/",
accept=["text/html"])
result = nano.agent_accepts(request, ["text/html"])
self.assertEqual(result, "text/html")
def test_not_accepted(self):
request = Request.blank("/",
accept=["application/xml"])
self.assertRaises(exc.HTTPNotAcceptable,
nano.agent_accepts,
request, ["text/html"])
def test_format_accepted(self):
request = Request.blank("/", urlvars={"format": "html"},
accept=["application/json"])
result = nano.agent_accepts(request, ["text/html"])
self.assertEqual(result, "text/html")
def test_format_not_accepted(self):
request = Request.blank("/", urlvars={"format": "json"})
self.assertRaises(exc.HTTPNotAcceptable,
nano.agent_accepts,
request, ["text/html"])
class EncodeBodyTest(unittest.TestCase):
def test_encode_body(self):
expected = {"foo": "bar"}
json_result = nano.encode_body("application/json",
expected)
result = json.loads(json_result)
self.assertEqual(result, expected)
def test_encode_body_unknown(self):
# This occurs when the developer screws up and doesn't use
# agent_accepts to guard against unoffered formats
self.assertRaises(ValueError,
nano.encode_body,
"application/xml", {})
def test_encode_body_encoders(self):
data = {"foo": "bar"}
expected = unicode(data)
result = nano.encode_body("text/plain",
data,
encoders={ "text/plain": unicode })
self.assertEqual(result, expected)
class DecodeBodyTest(unittest.TestCase):
def test_not_offered(self):
request = Request.blank("/",
method="PUT",
content_type="application/xml")
self.assertRaises(exc.HTTPUnsupportedMediaType,
nano.decode_body,
request, ["application/json"])
def test_good_json(self):
expected = {"title": "test"}
body = json.dumps(expected)
request = Request.blank("/",
method="PUT",
content_type="application/json",
body=body)
result = nano.decode_body(request, ["application/json"])
self.assertEqual(result, expected)
def test_bad_json(self):
body = "{"
request = Request.blank("/",
method="PUT",
content_type="application/json",
body=body)
self.assertRaises(exc.HTTPBadRequest,
nano.decode_body,
request, ["application/json"])
def test_valid_schema(self):
json_schema = {"type": "object",
"properties": {
"title": {"type": "string"},
"pub_date": {"type": "string",
"format": "date-time"}
}}
expected = {"title": "Test",
"pub_date": "2011-07-13T00:00:00Z"}
body = json.dumps(expected)
request = Request.blank("/",
method="PUT",
content_type="application/json",
body=body)
result = nano.decode_body(request,
["application/json"],
json_schema=json_schema)
self.assertEqual(result, expected)
def test_invalid_schema(self):
json_schema = {"type": "object",
"properties": {
"title": {"type": "string"},
"pub_date": {"type": "string",
"format": "date-time"}
}}
expected = {"title": "Test",
"pub_date": "not a date-time"}
body = json.dumps(expected)
request = Request.blank("/",
method="PUT",
content_type="application/json",
body=body)
self.assertRaises(exc.HTTPBadRequest,
nano.decode_body,
request, ["application/json"],
json_schema=json_schema)
class TestAllowed(unittest.TestCase):
def test_is_allowed(self):
request = Request.blank("/",
method="PUT")
nano.allowed(request, ["GET", "PUT"])
def test_is_not_allowed(self):
request = Request.blank("/",
method="DELETE")
try:
nano.allowed(request, ["GET", "PUT"])
self.assertTrue(False, "the allowed() function succeeded.")
except exc.HTTPMethodNotAllowed, error:
self.assertEqual(error.allow, ("GET", "PUT", ))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment