Created
April 26, 2010 20:39
-
-
Save mdellavo/379881 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""The base Controller API | |
Provides the BaseController class for subclassing. | |
""" | |
from pylons.controllers import WSGIController | |
from pylons.templating import render_mako as render | |
from pylons import request, tmpl_context as c, response | |
from pylons.controllers.util import abort, redirect | |
from foo.model.meta import Session | |
from foo.model import User | |
from webhelpers.paginate import Page | |
from pprint import pformat | |
from sqlalchemy import asc, desc | |
import csv, re | |
from StringIO import StringIO | |
from datetime import date | |
class BaseController(WSGIController): | |
def __call__(self, environ, start_response): | |
"""Invoke the Controller""" | |
# WSGIController.__call__ dispatches to the Controller method | |
# the request is routed to. This routing information is | |
# available in environ['pylons.routes_dict'] | |
try: | |
return WSGIController.__call__(self, environ, start_response) | |
finally: | |
Session.remove() | |
def __dump__(self, data): | |
response.content_type = 'text/plain' | |
return pformat(data) | |
def __get_user__(self): | |
if not request.environ.get('app.user') and 'REMOTE_USER' in request.environ: | |
request.environ['app.user'] = Session.query(User).filter_by(email=request.environ['REMOTE_USER']).first() | |
return request.environ.get('app.user') | |
__user__ = property(__get_user__) | |
def __fetch__(self, model, **kwargs): | |
''' | |
Fetch a record from model or 404 as appropriate | |
''' | |
if id is None: | |
abort(404) | |
rv = Session.query(model).filter_by(**kwargs).one() | |
if rv is None: | |
abort(404) | |
return rv | |
def __export__(self, filename, seq): | |
if request.GET.get('format') != 'csv': | |
abort(400) | |
rv = StringIO() | |
writer = csv.writer(rv) | |
writer.writerows(seq) | |
ts = date.today().strftime('%Y-%m-%d') | |
data = rv.getvalue() | |
response.headers['Content-Type'] = 'text/csv' | |
response.headers['Content-Length'] = len(data) | |
response.headers['Content-Disposition'] = 'attachment; filename=%s-%s.csv;' % (re.sub(r'[^\w -]', '', filename), ts) | |
response.headers['Cache-Control'] = 'maxage=1' | |
response.headers['Pragma'] = 'public' | |
return data | |
def __filter__(self, query): | |
return query | |
def __paginate__(self, query, page=1, count=20, order=None, sort='asc'): | |
if 'page' not in request.GET: | |
request.GET['page'] = page | |
if 'count' not in request.GET: | |
request.GET['count'] = count | |
if 'order' not in request.GET and order is not None: | |
request.GET['order'] = order | |
if 'sort' not in request.GET: | |
request.GET['sort'] = sort | |
if 'order' in request.GET: | |
ordering = request.GET['order'].lower() | |
sort_func = desc if request.GET['sort'].lower() == 'desc' else asc | |
if ordering in self.orderings: | |
query = query.order_by(sort_func(self.orderings[ordering])) | |
query = self.__filter__(query) | |
rv = Page(query, | |
page = int(request.GET['page']), | |
items_per_page = int(request.GET['count'])) | |
return rv |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment