Skip to content

Instantly share code, notes, and snippets.

@whitmo
Created May 10, 2010 21:21
Show Gist options
  • Save whitmo/396568 to your computer and use it in GitHub Desktop.
Save whitmo/396568 to your computer and use it in GitHub Desktop.
from UserDict import DictMixin
from decorator import decorator
from formencode import htmlfill
from pylons.decorators import PylonsFormEncodeState, determine_response_charset, encode_formencode_errors
from webob import UnicodeMultiDict, MultiDict
import formencode
import peppercorn
import logging
log = logging.getLogger(__name__)
def peppercorn_encode(mapping, stack=None):
"""
>>> from pprint import pprint as pp
>>> test = dict(list=[dict(name='value1'),
... dict(name='value2')],
... map=dict(name='A Map',
... value='A Value'),
... name='value')
>>> output = form.peppercorn_encode(test)
# may be susceptible to changes in hashing
>>> pp(output)
[('__start__', 'map:mapping'),
('name', 'A Map'),
('value', 'A Value'),
('__end__', 'map:mapping'),
('__start__', 'list:sequence'),
('name', 'value1'),
('name', 'value2'),
('__end__', 'list:sequence'),
('name', 'value')]
"""
if stack is None:
stack = list()
for name, value in mapping.items():
if isinstance(value, dict):
stack.append((peppercorn.START, "%s:%s" %(name, peppercorn.MAPPING)))
stack = peppercorn_encode(value, stack)
stack.append((peppercorn.END, "%s:%s" %(name, peppercorn.MAPPING)))
elif isinstance(value, list):
stack.append((peppercorn.START, "%s:%s" %(name, peppercorn.SEQUENCE)))
for val in value:
stack = peppercorn_encode(val, stack)
stack.append((peppercorn.END, "%s:%s" %(name, peppercorn.SEQUENCE)))
else:
stack.append((name, value))
return stack
def validate(schema=None, validators=None, form=None, decode=peppercorn.parse,
encode=peppercorn_encode, post_only=True, state=None,
on_get=False, **htmlfill_kwargs):
"""
Modified version of pylons.decorators.validate,
works with generic encode / decode functions vs. variable_decode
Validate input either for a FormEncode schema, or individual
validators
Given a form schema or dict of validators, validate will attempt to
validate the schema or validator list.
If validation was successful, the valid result dict will be saved
as ``self.form_result``. Otherwise, the action will be re-run as if
it was a GET, and the output will be filled by FormEncode's
htmlfill to fill in the form field errors.
``schema``
Refers to a FormEncode Schema object to use during validation.
``form``
Method used to display the form, which will be used to get the
HTML representation of the form for error filling.
``post_only``
Boolean that indicates whether or not GET (query) variables
should be included during validation.
.. warning::
``post_only`` applies to *where* the arguments to be
validated come from. It does *not* restrict the form to
only working with post, merely only checking POST vars.
``state``
Passed through to FormEncode for use in validators that utilize
a state object.
``on_get``
Whether to validate on GET requests. By default only POST
requests are validated.
Example::
class SomeController(BaseController):
def create(controller, id):
return render('/myform.mako')
@validate(schema=model.forms.myshema(), form='create')
def update(controller, id):
# Do something with controller.form_result
pass
"""
if state is None:
state = PylonsFormEncodeState
@decorator
def wrapper(func, controller, *args, **kwargs):
"""Decorator Wrapper function"""
request = controller._py_object.request
errors = {}
# Skip the validation if on_get is False and its a GET
if not on_get and request.environ['REQUEST_METHOD'] == 'GET':
return func(controller, *args, **kwargs)
if post_only is False:
raise ValueError('Only post')
params = request.POST
is_unicode_params = isinstance(params, UnicodeMultiDict)
if decode:
log.debug("Running %s on params" %decode)
decoded = decode(params.items())
else:
decoded = params
if schema:
log.debug("Validating against a schema")
try:
controller.form_result = schema.to_python(decoded, state)
except formencode.Invalid, e:
errors = e.unpack_errors()
if validators:
log.debug("Validating against provided validators")
if isinstance(validators, dict):
if not hasattr(controller, 'form_result'):
controller.form_result = {}
for field, validator in validators.iteritems():
try:
controller.form_result[field] = \
validator.to_python(decoded.get(field), state)
except formencode.Invalid, error:
errors[field] = error
if errors:
log.debug("Errors found in validation, parsing form with htmlfill "
"for errors")
request.environ['REQUEST_METHOD'] = 'GET'
controller._py_object.c.form_errors = errors
# If there's no form supplied, just continue with the current
# function call.
if not form:
return func(controller, *args, **kwargs)
request.environ['pylons.routes_dict']['action'] = form
response = controller._dispatch_call()
# XXX: Legacy WSGIResponse support
legacy_response = False
if hasattr(response, 'content'):
form_content = ''.join(response.content)
legacy_response = True
else:
form_content = response
response = controller._py_object.response
# If the form_content is an exception response, return it
if hasattr(form_content, '_exception'):
return form_content
# Ensure htmlfill can safely combine the form_content, params and
# errors variables (that they're all of the same string type)
if not is_unicode_params:
log.debug("Raw string form params: ensuring the '%s' form and "
"FormEncode errors are converted to raw strings for "
"htmlfill", form)
encoding = determine_response_charset(response)
# WSGIResponse's content may (unlikely) be unicode
if isinstance(form_content, unicode):
form_content = form_content.encode(encoding,
response.errors)
# FormEncode>=0.7 errors are unicode (due to being localized
# via ugettext). Convert any of the possible formencode
# unpack_errors formats to contain raw strings
errors = encode_formencode_errors(errors, encoding,
response.errors)
elif not isinstance(form_content, unicode):
log.debug("Unicode form params: ensuring the '%s' form is "
"converted to unicode for htmlfill", form)
encoding = determine_response_charset(response)
form_content = form_content.decode(encoding)
errors_encoded = PopMap(encode(errors))
form_content = htmlfill.render(form_content, defaults=PopMap(params),
errors=errors_encoded, **htmlfill_kwargs)
if legacy_response:
# Let the Controller merge the legacy response
response.content = form_content
return response
else:
return form_content
return func(controller, *args, **kwargs)
return wrapper
class PopMap(DictMixin):
"""
Wraps a multidict but does a pop on the multidict rather than get,
thereby exhausting keys through repeated access
"""
def __init__(self, data):
if isinstance(data, MultiDict):
self.multidict = data
else:
self.multidict = MultiDict(data)
def keys(self):
return self.multidict.keys()
def __getitem__(self, key):
"""
After each item is used once, we want it purged from the
multidict
"""
return self.multidict.__getitem__(key)
def get(self, key, default=None):
return self.multidict.pop(key, default)
def __setitem__(self, key, val):
self.multidict[key] = val
def __delitem__(self, key):
del self.multidict[key]
def copy(self):
return self.multidict.copy()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment