Created
May 10, 2010 21:21
-
-
Save whitmo/396568 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from UserDict import DictMixin | |
from decorator import decorator | |
from formencode import htmlfill | |
from pylons.decorators import PylonsFormEncodeState, determine_response_charset, encode_formencode_errors | |
from webob import UnicodeMultiDict, MultiDict | |
import formencode | |
import peppercorn | |
import logging | |
log = logging.getLogger(__name__) | |
def peppercorn_encode(mapping, stack=None): | |
""" | |
>>> from pprint import pprint as pp | |
>>> test = dict(list=[dict(name='value1'), | |
... dict(name='value2')], | |
... map=dict(name='A Map', | |
... value='A Value'), | |
... name='value') | |
>>> output = form.peppercorn_encode(test) | |
# may be susceptible to changes in hashing | |
>>> pp(output) | |
[('__start__', 'map:mapping'), | |
('name', 'A Map'), | |
('value', 'A Value'), | |
('__end__', 'map:mapping'), | |
('__start__', 'list:sequence'), | |
('name', 'value1'), | |
('name', 'value2'), | |
('__end__', 'list:sequence'), | |
('name', 'value')] | |
""" | |
if stack is None: | |
stack = list() | |
for name, value in mapping.items(): | |
if isinstance(value, dict): | |
stack.append((peppercorn.START, "%s:%s" %(name, peppercorn.MAPPING))) | |
stack = peppercorn_encode(value, stack) | |
stack.append((peppercorn.END, "%s:%s" %(name, peppercorn.MAPPING))) | |
elif isinstance(value, list): | |
stack.append((peppercorn.START, "%s:%s" %(name, peppercorn.SEQUENCE))) | |
for val in value: | |
stack = peppercorn_encode(val, stack) | |
stack.append((peppercorn.END, "%s:%s" %(name, peppercorn.SEQUENCE))) | |
else: | |
stack.append((name, value)) | |
return stack | |
def validate(schema=None, validators=None, form=None, decode=peppercorn.parse, | |
encode=peppercorn_encode, post_only=True, state=None, | |
on_get=False, **htmlfill_kwargs): | |
""" | |
Modified version of pylons.decorators.validate, | |
works with generic encode / decode functions vs. variable_decode | |
Validate input either for a FormEncode schema, or individual | |
validators | |
Given a form schema or dict of validators, validate will attempt to | |
validate the schema or validator list. | |
If validation was successful, the valid result dict will be saved | |
as ``self.form_result``. Otherwise, the action will be re-run as if | |
it was a GET, and the output will be filled by FormEncode's | |
htmlfill to fill in the form field errors. | |
``schema`` | |
Refers to a FormEncode Schema object to use during validation. | |
``form`` | |
Method used to display the form, which will be used to get the | |
HTML representation of the form for error filling. | |
``post_only`` | |
Boolean that indicates whether or not GET (query) variables | |
should be included during validation. | |
.. warning:: | |
``post_only`` applies to *where* the arguments to be | |
validated come from. It does *not* restrict the form to | |
only working with post, merely only checking POST vars. | |
``state`` | |
Passed through to FormEncode for use in validators that utilize | |
a state object. | |
``on_get`` | |
Whether to validate on GET requests. By default only POST | |
requests are validated. | |
Example:: | |
class SomeController(BaseController): | |
def create(controller, id): | |
return render('/myform.mako') | |
@validate(schema=model.forms.myshema(), form='create') | |
def update(controller, id): | |
# Do something with controller.form_result | |
pass | |
""" | |
if state is None: | |
state = PylonsFormEncodeState | |
@decorator | |
def wrapper(func, controller, *args, **kwargs): | |
"""Decorator Wrapper function""" | |
request = controller._py_object.request | |
errors = {} | |
# Skip the validation if on_get is False and its a GET | |
if not on_get and request.environ['REQUEST_METHOD'] == 'GET': | |
return func(controller, *args, **kwargs) | |
if post_only is False: | |
raise ValueError('Only post') | |
params = request.POST | |
is_unicode_params = isinstance(params, UnicodeMultiDict) | |
if decode: | |
log.debug("Running %s on params" %decode) | |
decoded = decode(params.items()) | |
else: | |
decoded = params | |
if schema: | |
log.debug("Validating against a schema") | |
try: | |
controller.form_result = schema.to_python(decoded, state) | |
except formencode.Invalid, e: | |
errors = e.unpack_errors() | |
if validators: | |
log.debug("Validating against provided validators") | |
if isinstance(validators, dict): | |
if not hasattr(controller, 'form_result'): | |
controller.form_result = {} | |
for field, validator in validators.iteritems(): | |
try: | |
controller.form_result[field] = \ | |
validator.to_python(decoded.get(field), state) | |
except formencode.Invalid, error: | |
errors[field] = error | |
if errors: | |
log.debug("Errors found in validation, parsing form with htmlfill " | |
"for errors") | |
request.environ['REQUEST_METHOD'] = 'GET' | |
controller._py_object.c.form_errors = errors | |
# If there's no form supplied, just continue with the current | |
# function call. | |
if not form: | |
return func(controller, *args, **kwargs) | |
request.environ['pylons.routes_dict']['action'] = form | |
response = controller._dispatch_call() | |
# XXX: Legacy WSGIResponse support | |
legacy_response = False | |
if hasattr(response, 'content'): | |
form_content = ''.join(response.content) | |
legacy_response = True | |
else: | |
form_content = response | |
response = controller._py_object.response | |
# If the form_content is an exception response, return it | |
if hasattr(form_content, '_exception'): | |
return form_content | |
# Ensure htmlfill can safely combine the form_content, params and | |
# errors variables (that they're all of the same string type) | |
if not is_unicode_params: | |
log.debug("Raw string form params: ensuring the '%s' form and " | |
"FormEncode errors are converted to raw strings for " | |
"htmlfill", form) | |
encoding = determine_response_charset(response) | |
# WSGIResponse's content may (unlikely) be unicode | |
if isinstance(form_content, unicode): | |
form_content = form_content.encode(encoding, | |
response.errors) | |
# FormEncode>=0.7 errors are unicode (due to being localized | |
# via ugettext). Convert any of the possible formencode | |
# unpack_errors formats to contain raw strings | |
errors = encode_formencode_errors(errors, encoding, | |
response.errors) | |
elif not isinstance(form_content, unicode): | |
log.debug("Unicode form params: ensuring the '%s' form is " | |
"converted to unicode for htmlfill", form) | |
encoding = determine_response_charset(response) | |
form_content = form_content.decode(encoding) | |
errors_encoded = PopMap(encode(errors)) | |
form_content = htmlfill.render(form_content, defaults=PopMap(params), | |
errors=errors_encoded, **htmlfill_kwargs) | |
if legacy_response: | |
# Let the Controller merge the legacy response | |
response.content = form_content | |
return response | |
else: | |
return form_content | |
return func(controller, *args, **kwargs) | |
return wrapper | |
class PopMap(DictMixin): | |
""" | |
Wraps a multidict but does a pop on the multidict rather than get, | |
thereby exhausting keys through repeated access | |
""" | |
def __init__(self, data): | |
if isinstance(data, MultiDict): | |
self.multidict = data | |
else: | |
self.multidict = MultiDict(data) | |
def keys(self): | |
return self.multidict.keys() | |
def __getitem__(self, key): | |
""" | |
After each item is used once, we want it purged from the | |
multidict | |
""" | |
return self.multidict.__getitem__(key) | |
def get(self, key, default=None): | |
return self.multidict.pop(key, default) | |
def __setitem__(self, key, val): | |
self.multidict[key] = val | |
def __delitem__(self, key): | |
del self.multidict[key] | |
def copy(self): | |
return self.multidict.copy() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment