Created
May 6, 2014 10:34
-
-
Save aodag/4ee5a8f7e2d2f9e826ed to your computer and use it in GitHub Desktop.
restful-json wsgi app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from sqlalchemy import ( | |
Column, | |
Integer, | |
ForeignKey, | |
Unicode, | |
) | |
from sqlalchemy.orm import ( | |
scoped_session, | |
sessionmaker, | |
relationship, | |
backref, | |
) | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method | |
from sqlalchemy.ext.associationproxy import association_proxy | |
from webob.dec import wsgify | |
from webob.exc import ( | |
HTTPMethodNotAllowed, | |
HTTPNotFound, | |
) | |
import simplejson as json | |
Base = declarative_base() | |
DBSession = scoped_session(sessionmaker()) | |
class Person(Base): | |
__tablename__ = 'person' | |
query = DBSession.query_property() | |
id = Column(Integer, primary_key=True) | |
first_name = Column(Unicode(255), nullable=False, | |
default="") | |
last_name = Column(Unicode(255), nullable=False, | |
default="") | |
person_tags = relationship('Tag', | |
backref='person', | |
cascade='all, delete-orphan', | |
cascade_backrefs=True) | |
tags = association_proxy('person_tags', 'name', | |
creator=lambda name: Tag(name=name)) | |
@hybrid_property | |
def full_name(self): | |
return self.last_name + " " + self.first_name | |
@hybrid_method | |
def has_tag(self, tag): | |
return tag in self.tags | |
@has_tag.expression | |
def has_tag(cls, tag): | |
return cls.tags.contains(tag) | |
class Tag(Base): | |
__tablename__ = 'tags' | |
id = Column(Integer, primary_key=True) | |
name = Column(Unicode(255)) | |
person_id = Column(Integer, ForeignKey('person.id')) | |
@classmethod | |
def all_tags(cls): | |
return cls.name.distinct().label('name') | |
@functools.singledispatch | |
def todict(obj): | |
pass | |
@todict.register(Person) | |
def _(p): | |
return dict(first_name=p.first_name, | |
last_name=p.last_name, | |
full_name=p.full_name, | |
id=p.id, | |
tags=list(p.tags)) | |
def get_collection(request): | |
query_tags = request.GET.getall('tags') | |
q = Person.query | |
if query_tags: | |
for tag in query_tags: | |
q = q.filter(Person.tags.contains(tag)) | |
people = q.all() | |
tags = [t.name for t in DBSession.query(Tag.all_tags()).order_by(Tag.name)] | |
request.response.content_type = "application/json" | |
return json.dumps( | |
dict(items=[todict(p) | |
for p in people], | |
count=q.count(), | |
tags=tags)) | |
def post_collection(request): | |
params = json.loads(request.body) | |
person = Person(first_name=params['first_name'], | |
last_name=params['last_name'], | |
tags=params['tags']) | |
Person.query.session.add(person) | |
Person.query.session.flush() | |
person_id = person.id | |
request.response.status = 201 | |
request.response.location = "/" + str(person_id) | |
request.response.content_type = "application/json" | |
return json.dumps(todict(person)) | |
def get_member(request): | |
person_id = request.path_info.strip("/") | |
person = Person.query.filter(Person.id == person_id).first() | |
if person is None: | |
return HTTPNotFound() | |
request.response.content_type = "application/json" | |
return json.dumps(todict(person)) | |
def put_member(request): | |
person_id = request.path_info.strip("/") | |
params = json.loads(request.body) | |
person = Person.query.filter(Person.id == person_id).first() | |
if person is None: | |
return HTTPNotFound() | |
person.first_name = params['first_name'] | |
person.last_name = params['last_name'] | |
person.tags = params['tags'] | |
request.response.content_type = "application/json" | |
return json.dumps(todict(person)) | |
def delete_member(request): | |
person_id = request.path_info.strip("/") | |
person = Person.query.filter(Person.id == person_id).first() | |
if person is None: | |
return HTTPNotFound() | |
Person.query.session.delete(person) | |
return "" | |
@wsgify | |
def application(request): | |
if request.path_info == '/': | |
if request.method == "GET": | |
return get_collection(request) | |
elif request.method == "POST": | |
return post_collection(request) | |
else: | |
return HTTPMethodNotAllowed() | |
else: | |
if request.method == "GET": | |
return get_member(request) | |
elif request.method == "PUT": | |
return put_member(request) | |
elif request.method == "DELETE": | |
return delete_member(request) | |
else: | |
return HTTPMethodNotAllowed() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SQLAlchemy==0.9.4 | |
WebOb==1.3.1 | |
WebTest==2.0.15 | |
beautifulsoup4==4.3.2 | |
simplejson==3.4.1 | |
six==1.6.1 | |
testfixtures==3.0.2 | |
waitress==0.8.8 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from testfixtures import compare, Comparison as C | |
import simplejson as json | |
import webtest | |
from webob import Request, Response | |
def setUpModule(): | |
from sqlalchemy import create_engine | |
from main import DBSession, Base | |
engine = create_engine('sqlite:///') | |
Base.metadata.drop_all(bind=engine) | |
Base.metadata.create_all(bind=engine) | |
DBSession.remove() | |
DBSession.configure(bind=engine) | |
class TestPerson(unittest.TestCase): | |
def tearDown(self): | |
from main import DBSession | |
DBSession.rollback() | |
DBSession.remove() | |
def _get_target(self): | |
from main import Person | |
return Person | |
def _make_one(self, *args, **kwargs): | |
return self._get_target()(*args, **kwargs) | |
def test_tags(self): | |
target = self._make_one() | |
compare(target.tags, []) | |
def test_tags_creator(self): | |
from main import Tag | |
target = self._make_one() | |
target.tags.append('tag1') | |
compare(target.tags, ['tag1']) | |
compare(target.person_tags, | |
[C(Tag, name='tag1', strict=False)]) | |
def test_has_tag(self): | |
target = self._get_target() | |
for i in range(10): | |
user = self._make_one(tags=['tag%d' % i]) | |
target.query.session.add(user) | |
result = target.query.filter(target.has_tag('tag1')).first() | |
compare(result, C(target, tags=['tag1'], strict=False)) | |
self.assertTrue(result.has_tag('tag1')) | |
class Test_get_collection(unittest.TestCase): | |
def tearDown(self): | |
from main import DBSession | |
DBSession.rollback() | |
DBSession.remove() | |
def _call_fut(self, *args, **kwargs): | |
from main import get_collection | |
return get_collection(*args, **kwargs) | |
def _make_person(self, *args, **kwargs): | |
from main import Person | |
person = Person(*args, **kwargs) | |
Person.query.session.add(person) | |
return person | |
def test_it(self): | |
request = Request.blank('/') | |
request.response = Response(request=request) | |
self._make_person(first_name="testing", | |
last_name="user", | |
tags=['tag1', 'tag2']) | |
result = self._call_fut(request) | |
compare(json.loads(result), | |
{'count': 1, | |
'items': [ | |
{'first_name': 'testing', | |
'full_name': 'user testing', | |
'id': 1, | |
'last_name': 'user', | |
'tags': ['tag1', 'tag2']}, | |
], | |
'tags': ['tag1', 'tag2']}) | |
def test_it_tags(self): | |
request = Request.blank('/?tags=tag3') | |
request.response = Response(request=request) | |
self._make_person(first_name="testing", | |
last_name="user", | |
tags=['tag1', 'tag2']) | |
self._make_person(first_name="testing", | |
last_name="user2", | |
tags=['tag3']) | |
result = self._call_fut(request) | |
compare(json.loads(result), | |
{'count': 1, | |
'items': [ | |
{'first_name': 'testing', | |
'full_name': 'user2 testing', | |
'id': 2, | |
'last_name': 'user2', | |
'tags': ['tag3']} | |
], | |
'tags': ['tag1', 'tag2', 'tag3']}) | |
class Test_post_collection(unittest.TestCase): | |
def tearDown(self): | |
from main import DBSession | |
DBSession.rollback() | |
DBSession.remove() | |
def _call_fut(self, *args, **kwargs): | |
from main import post_collection | |
return post_collection(*args, **kwargs) | |
def _get_person(self, *args, **kwargs): | |
from main import Person | |
return Person.query.filter_by(*args, **kwargs).first() | |
def test_it(self): | |
from main import Person | |
data = { | |
'first_name': 'testing', | |
'last_name': 'created-user', | |
'tags': ['created'] | |
} | |
request = Request.blank("/", body=json.dumps(data).encode('utf-8')) | |
request.response = Response(request=request) | |
result = self._call_fut(request) | |
compare(request.response, | |
C(Response, | |
status='201 Created', | |
location='/1', | |
strict=False)) | |
compare(json.loads(result), | |
{'full_name': 'created-user testing', | |
'first_name': 'testing', | |
'last_name': 'created-user', | |
'tags': ['created'], | |
'id': 1}) | |
created = self._get_person(id=1) | |
compare(created, | |
C(Person, | |
first_name='testing', | |
last_name='created-user', | |
tags=['created'], | |
strict=False)) | |
class Test_get_member(unittest.TestCase): | |
def tearDown(self): | |
from main import DBSession | |
DBSession.rollback() | |
DBSession.remove() | |
def _call_fut(self, *args, **kwargs): | |
from main import get_member | |
return get_member(*args, **kwargs) | |
def _make_person(self, *args, **kwargs): | |
from main import Person | |
person = Person(*args, **kwargs) | |
Person.query.session.add(person) | |
Person.query.session.flush() | |
return person | |
def test_it(self): | |
person = self._make_person(first_name="already", | |
last_name="exists", | |
tags=['already-exists']) | |
person_id = person.id | |
request = Request.blank("/" + str(person_id)) | |
request.response = Response(request=request) | |
result = self._call_fut(request) | |
compare(json.loads(result), | |
{ | |
'first_name': 'already', | |
'full_name': 'exists already', | |
'id': 1, | |
'last_name': 'exists', | |
'tags': ['already-exists'], | |
}) | |
def test_not_found(self): | |
from webob.exc import HTTPNotFound | |
request = Request.blank("/" + str(100)) | |
result = self._call_fut(request) | |
compare(result, | |
C(HTTPNotFound, | |
status="404 Not Found", | |
strict=False)) | |
class Test_put_member(unittest.TestCase): | |
def tearDown(self): | |
from main import DBSession | |
DBSession.rollback() | |
DBSession.remove() | |
def _call_fut(self, *args, **kwargs): | |
from main import put_member | |
return put_member(*args, **kwargs) | |
def _make_person(self, *args, **kwargs): | |
from main import Person | |
person = Person(*args, **kwargs) | |
Person.query.session.add(person) | |
Person.query.session.flush() | |
return person | |
def _get_person(self, *args, **kwargs): | |
from main import Person | |
return Person.query.filter_by(*args, **kwargs).first() | |
def test_it(self): | |
person = self._make_person(first_name="already", | |
last_name="exists", | |
tags=['already-exists']) | |
person_id = person.id | |
data = { | |
'first_name': 'testing', | |
'last_name': 'updated-user', | |
'tags': ['updated'] | |
} | |
request = Request.blank( | |
"/" + str(person_id), | |
body=json.dumps(data).encode('utf-8')) | |
request.response = Response(request=request) | |
result = self._call_fut(request) | |
compare(json.loads(result), | |
{ | |
'first_name': 'testing', | |
'full_name': 'updated-user testing', | |
'id': 1, | |
'last_name': 'updated-user', | |
'tags': ['updated'], | |
}) | |
updated = self._get_person(id=person_id) | |
compare(updated, | |
C(type(person), | |
first_name='testing', | |
full_name='updated-user testing', | |
last_name='updated-user', | |
tags=['updated'], | |
strict=False)) | |
def test_not_found(self): | |
from webob.exc import HTTPNotFound | |
data = { | |
'first_name': 'testing', | |
'last_name': 'updated-user', | |
'tags': ['updated'] | |
} | |
request = Request.blank( | |
"/" + str(9999), | |
body=json.dumps(data).encode('utf-8')) | |
result = self._call_fut(request) | |
compare(result, | |
C(HTTPNotFound, | |
status="404 Not Found", | |
strict=False)) | |
class Test_delete_member(unittest.TestCase): | |
def tearDown(self): | |
from main import DBSession | |
DBSession.rollback() | |
DBSession.remove() | |
def _call_fut(self, *args, **kwargs): | |
from main import delete_member | |
return delete_member(*args, **kwargs) | |
def _make_person(self, *args, **kwargs): | |
from main import Person | |
person = Person(*args, **kwargs) | |
Person.query.session.add(person) | |
Person.query.session.flush() | |
return person | |
def _get_person(self, *args, **kwargs): | |
from main import Person | |
return Person.query.filter_by(*args, **kwargs).first() | |
def test_it(self): | |
person = self._make_person(first_name="already", | |
last_name="exists", | |
tags=['already-exists']) | |
person_id = person.id | |
request = Request.blank( | |
"/" + str(person_id)) | |
result = self._call_fut(request) | |
existing = self._get_person(id=person_id) | |
self.assertIsNone(existing) | |
def test_not_found(self): | |
from webob.exc import HTTPNotFound | |
request = Request.blank( | |
"/" + str(8989)) | |
result = self._call_fut(request) | |
compare(result, | |
C(HTTPNotFound, | |
status="404 Not Found", | |
strict=False)) | |
class Test_application(unittest.TestCase): | |
def tearDown(self): | |
from main import DBSession | |
DBSession.rollback() | |
DBSession.remove() | |
def test_it(self): | |
from main import application | |
app = webtest.TestApp(application) | |
res = app.get('/') | |
compare(res.json, | |
{'tags': [], 'count': 0, 'items': []}) | |
data = { | |
'first_name': 'testing', | |
'last_name': 'created-user', | |
'tags': ['created'] | |
} | |
res = app.post('/', params=json.dumps(data)) | |
person_url = res.location | |
res = app.get(person_url) | |
compare(res.json, | |
{ | |
'id': 1, | |
'first_name': 'testing', | |
'last_name': 'created-user', | |
'full_name': 'created-user testing', | |
'tags': ['created'] | |
}) | |
data = { | |
'first_name': 'testing', | |
'last_name': 'updated-user', | |
'tags': ['created', 'updated'] | |
} | |
res = app.put(person_url, params=json.dumps(data)) | |
compare(res.json, | |
{ | |
'first_name': 'testing', | |
'last_name': 'updated-user', | |
'tags': ['created', 'updated'], | |
'full_name': 'updated-user testing', | |
'id': 1, | |
}) | |
res = app.get(person_url) | |
compare(res.json, | |
{ | |
'first_name': 'testing', | |
'last_name': 'updated-user', | |
'tags': ['created', 'updated'], | |
'full_name': 'updated-user testing', | |
'id': 1, | |
}) | |
res = app.get('/') | |
compare(res.json, | |
{'count': 1, | |
'items': [{'first_name': 'testing', | |
'full_name': 'updated-user testing', | |
'id': 1, | |
'last_name': 'updated-user', | |
'tags': ['created', 'updated']}], | |
'tags': ['created', 'updated']}) | |
res = app.delete(person_url) | |
res = app.get('/') | |
compare(res.json, | |
{'tags': [], 'count': 0, 'items': []}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment