Skip to content

Instantly share code, notes, and snippets.

@aodag
Created May 6, 2014 10:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aodag/4ee5a8f7e2d2f9e826ed to your computer and use it in GitHub Desktop.
Save aodag/4ee5a8f7e2d2f9e826ed to your computer and use it in GitHub Desktop.
restful-json wsgi app
import functools
from sqlalchemy import (
Column,
Integer,
ForeignKey,
Unicode,
)
from sqlalchemy.orm import (
scoped_session,
sessionmaker,
relationship,
backref,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
from sqlalchemy.ext.associationproxy import association_proxy
from webob.dec import wsgify
from webob.exc import (
HTTPMethodNotAllowed,
HTTPNotFound,
)
import simplejson as json
Base = declarative_base()
DBSession = scoped_session(sessionmaker())
class Person(Base):
__tablename__ = 'person'
query = DBSession.query_property()
id = Column(Integer, primary_key=True)
first_name = Column(Unicode(255), nullable=False,
default="")
last_name = Column(Unicode(255), nullable=False,
default="")
person_tags = relationship('Tag',
backref='person',
cascade='all, delete-orphan',
cascade_backrefs=True)
tags = association_proxy('person_tags', 'name',
creator=lambda name: Tag(name=name))
@hybrid_property
def full_name(self):
return self.last_name + " " + self.first_name
@hybrid_method
def has_tag(self, tag):
return tag in self.tags
@has_tag.expression
def has_tag(cls, tag):
return cls.tags.contains(tag)
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(Unicode(255))
person_id = Column(Integer, ForeignKey('person.id'))
@classmethod
def all_tags(cls):
return cls.name.distinct().label('name')
@functools.singledispatch
def todict(obj):
pass
@todict.register(Person)
def _(p):
return dict(first_name=p.first_name,
last_name=p.last_name,
full_name=p.full_name,
id=p.id,
tags=list(p.tags))
def get_collection(request):
query_tags = request.GET.getall('tags')
q = Person.query
if query_tags:
for tag in query_tags:
q = q.filter(Person.tags.contains(tag))
people = q.all()
tags = [t.name for t in DBSession.query(Tag.all_tags()).order_by(Tag.name)]
request.response.content_type = "application/json"
return json.dumps(
dict(items=[todict(p)
for p in people],
count=q.count(),
tags=tags))
def post_collection(request):
params = json.loads(request.body)
person = Person(first_name=params['first_name'],
last_name=params['last_name'],
tags=params['tags'])
Person.query.session.add(person)
Person.query.session.flush()
person_id = person.id
request.response.status = 201
request.response.location = "/" + str(person_id)
request.response.content_type = "application/json"
return json.dumps(todict(person))
def get_member(request):
person_id = request.path_info.strip("/")
person = Person.query.filter(Person.id == person_id).first()
if person is None:
return HTTPNotFound()
request.response.content_type = "application/json"
return json.dumps(todict(person))
def put_member(request):
person_id = request.path_info.strip("/")
params = json.loads(request.body)
person = Person.query.filter(Person.id == person_id).first()
if person is None:
return HTTPNotFound()
person.first_name = params['first_name']
person.last_name = params['last_name']
person.tags = params['tags']
request.response.content_type = "application/json"
return json.dumps(todict(person))
def delete_member(request):
person_id = request.path_info.strip("/")
person = Person.query.filter(Person.id == person_id).first()
if person is None:
return HTTPNotFound()
Person.query.session.delete(person)
return ""
@wsgify
def application(request):
if request.path_info == '/':
if request.method == "GET":
return get_collection(request)
elif request.method == "POST":
return post_collection(request)
else:
return HTTPMethodNotAllowed()
else:
if request.method == "GET":
return get_member(request)
elif request.method == "PUT":
return put_member(request)
elif request.method == "DELETE":
return delete_member(request)
else:
return HTTPMethodNotAllowed()
SQLAlchemy==0.9.4
WebOb==1.3.1
WebTest==2.0.15
beautifulsoup4==4.3.2
simplejson==3.4.1
six==1.6.1
testfixtures==3.0.2
waitress==0.8.8
import unittest
from testfixtures import compare, Comparison as C
import simplejson as json
import webtest
from webob import Request, Response
def setUpModule():
from sqlalchemy import create_engine
from main import DBSession, Base
engine = create_engine('sqlite:///')
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
DBSession.remove()
DBSession.configure(bind=engine)
class TestPerson(unittest.TestCase):
def tearDown(self):
from main import DBSession
DBSession.rollback()
DBSession.remove()
def _get_target(self):
from main import Person
return Person
def _make_one(self, *args, **kwargs):
return self._get_target()(*args, **kwargs)
def test_tags(self):
target = self._make_one()
compare(target.tags, [])
def test_tags_creator(self):
from main import Tag
target = self._make_one()
target.tags.append('tag1')
compare(target.tags, ['tag1'])
compare(target.person_tags,
[C(Tag, name='tag1', strict=False)])
def test_has_tag(self):
target = self._get_target()
for i in range(10):
user = self._make_one(tags=['tag%d' % i])
target.query.session.add(user)
result = target.query.filter(target.has_tag('tag1')).first()
compare(result, C(target, tags=['tag1'], strict=False))
self.assertTrue(result.has_tag('tag1'))
class Test_get_collection(unittest.TestCase):
def tearDown(self):
from main import DBSession
DBSession.rollback()
DBSession.remove()
def _call_fut(self, *args, **kwargs):
from main import get_collection
return get_collection(*args, **kwargs)
def _make_person(self, *args, **kwargs):
from main import Person
person = Person(*args, **kwargs)
Person.query.session.add(person)
return person
def test_it(self):
request = Request.blank('/')
request.response = Response(request=request)
self._make_person(first_name="testing",
last_name="user",
tags=['tag1', 'tag2'])
result = self._call_fut(request)
compare(json.loads(result),
{'count': 1,
'items': [
{'first_name': 'testing',
'full_name': 'user testing',
'id': 1,
'last_name': 'user',
'tags': ['tag1', 'tag2']},
],
'tags': ['tag1', 'tag2']})
def test_it_tags(self):
request = Request.blank('/?tags=tag3')
request.response = Response(request=request)
self._make_person(first_name="testing",
last_name="user",
tags=['tag1', 'tag2'])
self._make_person(first_name="testing",
last_name="user2",
tags=['tag3'])
result = self._call_fut(request)
compare(json.loads(result),
{'count': 1,
'items': [
{'first_name': 'testing',
'full_name': 'user2 testing',
'id': 2,
'last_name': 'user2',
'tags': ['tag3']}
],
'tags': ['tag1', 'tag2', 'tag3']})
class Test_post_collection(unittest.TestCase):
def tearDown(self):
from main import DBSession
DBSession.rollback()
DBSession.remove()
def _call_fut(self, *args, **kwargs):
from main import post_collection
return post_collection(*args, **kwargs)
def _get_person(self, *args, **kwargs):
from main import Person
return Person.query.filter_by(*args, **kwargs).first()
def test_it(self):
from main import Person
data = {
'first_name': 'testing',
'last_name': 'created-user',
'tags': ['created']
}
request = Request.blank("/", body=json.dumps(data).encode('utf-8'))
request.response = Response(request=request)
result = self._call_fut(request)
compare(request.response,
C(Response,
status='201 Created',
location='/1',
strict=False))
compare(json.loads(result),
{'full_name': 'created-user testing',
'first_name': 'testing',
'last_name': 'created-user',
'tags': ['created'],
'id': 1})
created = self._get_person(id=1)
compare(created,
C(Person,
first_name='testing',
last_name='created-user',
tags=['created'],
strict=False))
class Test_get_member(unittest.TestCase):
def tearDown(self):
from main import DBSession
DBSession.rollback()
DBSession.remove()
def _call_fut(self, *args, **kwargs):
from main import get_member
return get_member(*args, **kwargs)
def _make_person(self, *args, **kwargs):
from main import Person
person = Person(*args, **kwargs)
Person.query.session.add(person)
Person.query.session.flush()
return person
def test_it(self):
person = self._make_person(first_name="already",
last_name="exists",
tags=['already-exists'])
person_id = person.id
request = Request.blank("/" + str(person_id))
request.response = Response(request=request)
result = self._call_fut(request)
compare(json.loads(result),
{
'first_name': 'already',
'full_name': 'exists already',
'id': 1,
'last_name': 'exists',
'tags': ['already-exists'],
})
def test_not_found(self):
from webob.exc import HTTPNotFound
request = Request.blank("/" + str(100))
result = self._call_fut(request)
compare(result,
C(HTTPNotFound,
status="404 Not Found",
strict=False))
class Test_put_member(unittest.TestCase):
def tearDown(self):
from main import DBSession
DBSession.rollback()
DBSession.remove()
def _call_fut(self, *args, **kwargs):
from main import put_member
return put_member(*args, **kwargs)
def _make_person(self, *args, **kwargs):
from main import Person
person = Person(*args, **kwargs)
Person.query.session.add(person)
Person.query.session.flush()
return person
def _get_person(self, *args, **kwargs):
from main import Person
return Person.query.filter_by(*args, **kwargs).first()
def test_it(self):
person = self._make_person(first_name="already",
last_name="exists",
tags=['already-exists'])
person_id = person.id
data = {
'first_name': 'testing',
'last_name': 'updated-user',
'tags': ['updated']
}
request = Request.blank(
"/" + str(person_id),
body=json.dumps(data).encode('utf-8'))
request.response = Response(request=request)
result = self._call_fut(request)
compare(json.loads(result),
{
'first_name': 'testing',
'full_name': 'updated-user testing',
'id': 1,
'last_name': 'updated-user',
'tags': ['updated'],
})
updated = self._get_person(id=person_id)
compare(updated,
C(type(person),
first_name='testing',
full_name='updated-user testing',
last_name='updated-user',
tags=['updated'],
strict=False))
def test_not_found(self):
from webob.exc import HTTPNotFound
data = {
'first_name': 'testing',
'last_name': 'updated-user',
'tags': ['updated']
}
request = Request.blank(
"/" + str(9999),
body=json.dumps(data).encode('utf-8'))
result = self._call_fut(request)
compare(result,
C(HTTPNotFound,
status="404 Not Found",
strict=False))
class Test_delete_member(unittest.TestCase):
def tearDown(self):
from main import DBSession
DBSession.rollback()
DBSession.remove()
def _call_fut(self, *args, **kwargs):
from main import delete_member
return delete_member(*args, **kwargs)
def _make_person(self, *args, **kwargs):
from main import Person
person = Person(*args, **kwargs)
Person.query.session.add(person)
Person.query.session.flush()
return person
def _get_person(self, *args, **kwargs):
from main import Person
return Person.query.filter_by(*args, **kwargs).first()
def test_it(self):
person = self._make_person(first_name="already",
last_name="exists",
tags=['already-exists'])
person_id = person.id
request = Request.blank(
"/" + str(person_id))
result = self._call_fut(request)
existing = self._get_person(id=person_id)
self.assertIsNone(existing)
def test_not_found(self):
from webob.exc import HTTPNotFound
request = Request.blank(
"/" + str(8989))
result = self._call_fut(request)
compare(result,
C(HTTPNotFound,
status="404 Not Found",
strict=False))
class Test_application(unittest.TestCase):
def tearDown(self):
from main import DBSession
DBSession.rollback()
DBSession.remove()
def test_it(self):
from main import application
app = webtest.TestApp(application)
res = app.get('/')
compare(res.json,
{'tags': [], 'count': 0, 'items': []})
data = {
'first_name': 'testing',
'last_name': 'created-user',
'tags': ['created']
}
res = app.post('/', params=json.dumps(data))
person_url = res.location
res = app.get(person_url)
compare(res.json,
{
'id': 1,
'first_name': 'testing',
'last_name': 'created-user',
'full_name': 'created-user testing',
'tags': ['created']
})
data = {
'first_name': 'testing',
'last_name': 'updated-user',
'tags': ['created', 'updated']
}
res = app.put(person_url, params=json.dumps(data))
compare(res.json,
{
'first_name': 'testing',
'last_name': 'updated-user',
'tags': ['created', 'updated'],
'full_name': 'updated-user testing',
'id': 1,
})
res = app.get(person_url)
compare(res.json,
{
'first_name': 'testing',
'last_name': 'updated-user',
'tags': ['created', 'updated'],
'full_name': 'updated-user testing',
'id': 1,
})
res = app.get('/')
compare(res.json,
{'count': 1,
'items': [{'first_name': 'testing',
'full_name': 'updated-user testing',
'id': 1,
'last_name': 'updated-user',
'tags': ['created', 'updated']}],
'tags': ['created', 'updated']})
res = app.delete(person_url)
res = app.get('/')
compare(res.json,
{'tags': [], 'count': 0, 'items': []})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment