Created
March 23, 2011 19:51
-
-
Save ericmoritz/883817 to your computer and use it in GitHub Desktop.
My nano-framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""A simple little hello/goodbye example which takes advantage of urlvars""" | |
from nano import FrontController | |
import routes | |
from routes.middleware import RoutesMiddleware | |
from webob.dec import wsgify | |
from webob.exc import HTTPNotFound | |
from webob import Response | |
resources = {} | |
@wsgify | |
def hello(request): | |
return Response("Hello, World", content_type="text/plain") | |
resources['hello'] = hello | |
@wsgify | |
def goodbye(request): | |
name = request.urlvars['name'] | |
return Response("Goodbye %s" % name, content_type="text/plain") | |
resources['goodbye'] = goodbye | |
mapper = routes.Mapper() | |
mapper.connect("/", resource="hello") | |
mapper.connect("/goodbye/{name}", resource="goodbye") | |
front = FrontController(resources) | |
application = RoutesMiddleware(front, mapper) | |
if __name__ == '__main__': | |
from wsgiref.simple_server import make_server | |
print "Listening on port 8081" | |
server = make_server("", 8081, application) | |
server.serve_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from webob.dec import wsgify | |
from webob import exc | |
import simplejson as json | |
from json_schema_validator.errors import ValidationError | |
from json_schema_validator.schema import Schema | |
from json_schema_validator.validator import Validator | |
content_types = { | |
"json": "application/json", | |
"html": "text/html" | |
} | |
decoders = { | |
"application/json": json.loads | |
} | |
encoders = { | |
"application/json": json.dumps | |
} | |
def agent_accepts(request, offers): | |
# If the format is given, always return | |
content_type = None | |
if "format" in request.urlvars: | |
format = request.urlvars.get("format") | |
content_type = content_types[format] | |
if content_type not in offers: | |
content_type = None | |
else: | |
content_type = request.accept.best_match(offers) | |
if content_type is None: | |
raise exc.HTTPNotAcceptable("Offered: %s" % ("; ".join(offers), )) | |
else: | |
return content_type | |
def allowed(request, methods): | |
if request.method not in methods: | |
raise exc.HTTPMethodNotAllowed(allow=methods) | |
def encode_body(content_type, data, encoders=encoders): | |
try: | |
encoder = encoders[content_type] | |
except KeyError: | |
raise ValueError("Unknown content-type: %s" % (content_type, )) | |
return encoder(data) | |
def require_user(request): | |
if request.remote_user is None: | |
raise exc.HTTPUnauthorized() | |
def decode_body(request, json_schema=None, decoders=decoders): | |
offers = decoders.keys() | |
content_type = request.content_type | |
if content_type not in offers: | |
raise exc.HTTPUnsupportedMediaType(u"%s is not supported" % content_type) | |
decoder = decoders[content_type] | |
try: | |
data = decoder(request.body) | |
except Exception, error: | |
raise exc.HTTPBadRequest(unicode(error)) | |
if json_schema: | |
try: | |
valid = Validator.validate(Schema(json_schema), data) | |
except ValidationError, error: | |
error_message = u"""%s | |
Schema: | |
%s""" % (unicode(error), json.dumps(json_schema).decode("ascii")) | |
raise exc.HTTPBadRequest(error_message) | |
return data | |
class FrontController(object): | |
def __init__(self, resources): | |
self.resources = resources | |
@wsgify | |
def __call__(self, request): | |
match = request.urlvars | |
if match: | |
inner_app = self.resources[match['resource']] | |
return request.get_response(inner_app) | |
else: | |
raise exc.HTTPNotFound() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Routes==1.12.3 | |
WebOb==1.1beta1 | |
json-schema-validator==2.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This is an example of nesting WSGI apps""" | |
from nano import FrontController | |
from routes.middleware import RoutesMiddleware | |
from greetings import application as greetings_app | |
import routes | |
front = FrontController({'greetings': greetings_app}) | |
mapper = routes.Mapper() | |
mapper.connect("/{version}/greetings/{path_info:.*}", resource="greetings") | |
application = RoutesMiddleware(front, mapper) | |
if __name__ == '__main__': | |
from wsgiref.simple_server import make_server | |
print "Listening on port 8081" | |
server = make_server("", 8081, application) | |
server.serve_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from webob import Request | |
from webob import exc | |
import nano | |
import simplejson as json | |
class AcceptsTest(unittest.TestCase): | |
def test_accepted(self): | |
request = Request.blank("/", | |
accept=["text/html"]) | |
result = nano.agent_accepts(request, ["text/html"]) | |
self.assertEqual(result, "text/html") | |
def test_not_accepted(self): | |
request = Request.blank("/", | |
accept=["application/xml"]) | |
self.assertRaises(exc.HTTPNotAcceptable, | |
nano.agent_accepts, | |
request, ["text/html"]) | |
def test_format_accepted(self): | |
request = Request.blank("/", urlvars={"format": "html"}, | |
accept=["application/json"]) | |
result = nano.agent_accepts(request, ["text/html"]) | |
self.assertEqual(result, "text/html") | |
def test_format_not_accepted(self): | |
request = Request.blank("/", urlvars={"format": "json"}) | |
self.assertRaises(exc.HTTPNotAcceptable, | |
nano.agent_accepts, | |
request, ["text/html"]) | |
class EncodeBodyTest(unittest.TestCase): | |
def test_encode_body(self): | |
expected = {"foo": "bar"} | |
json_result = nano.encode_body("application/json", | |
expected) | |
result = json.loads(json_result) | |
self.assertEqual(result, expected) | |
def test_encode_body_unknown(self): | |
# This occurs when the developer screws up and doesn't use | |
# agent_accepts to guard against unoffered formats | |
self.assertRaises(ValueError, | |
nano.encode_body, | |
"application/xml", {}) | |
def test_encode_body_encoders(self): | |
data = {"foo": "bar"} | |
expected = unicode(data) | |
result = nano.encode_body("text/plain", | |
data, | |
encoders={ "text/plain": unicode }) | |
self.assertEqual(result, expected) | |
class DecodeBodyTest(unittest.TestCase): | |
def test_not_offered(self): | |
request = Request.blank("/", | |
method="PUT", | |
content_type="application/xml") | |
self.assertRaises(exc.HTTPUnsupportedMediaType, | |
nano.decode_body, | |
request, ["application/json"]) | |
def test_good_json(self): | |
expected = {"title": "test"} | |
body = json.dumps(expected) | |
request = Request.blank("/", | |
method="PUT", | |
content_type="application/json", | |
body=body) | |
result = nano.decode_body(request, ["application/json"]) | |
self.assertEqual(result, expected) | |
def test_bad_json(self): | |
body = "{" | |
request = Request.blank("/", | |
method="PUT", | |
content_type="application/json", | |
body=body) | |
self.assertRaises(exc.HTTPBadRequest, | |
nano.decode_body, | |
request, ["application/json"]) | |
def test_valid_schema(self): | |
json_schema = {"type": "object", | |
"properties": { | |
"title": {"type": "string"}, | |
"pub_date": {"type": "string", | |
"format": "date-time"} | |
}} | |
expected = {"title": "Test", | |
"pub_date": "2011-07-13T00:00:00Z"} | |
body = json.dumps(expected) | |
request = Request.blank("/", | |
method="PUT", | |
content_type="application/json", | |
body=body) | |
result = nano.decode_body(request, | |
["application/json"], | |
json_schema=json_schema) | |
self.assertEqual(result, expected) | |
def test_invalid_schema(self): | |
json_schema = {"type": "object", | |
"properties": { | |
"title": {"type": "string"}, | |
"pub_date": {"type": "string", | |
"format": "date-time"} | |
}} | |
expected = {"title": "Test", | |
"pub_date": "not a date-time"} | |
body = json.dumps(expected) | |
request = Request.blank("/", | |
method="PUT", | |
content_type="application/json", | |
body=body) | |
self.assertRaises(exc.HTTPBadRequest, | |
nano.decode_body, | |
request, ["application/json"], | |
json_schema=json_schema) | |
class TestAllowed(unittest.TestCase): | |
def test_is_allowed(self): | |
request = Request.blank("/", | |
method="PUT") | |
nano.allowed(request, ["GET", "PUT"]) | |
def test_is_not_allowed(self): | |
request = Request.blank("/", | |
method="DELETE") | |
try: | |
nano.allowed(request, ["GET", "PUT"]) | |
self.assertTrue(False, "the allowed() function succeeded.") | |
except exc.HTTPMethodNotAllowed, error: | |
self.assertEqual(error.allow, ("GET", "PUT", )) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment