Created
November 14, 2013 19:46
-
-
Save diorcety/7473146 to your computer and use it in GitHub Desktop.
Mimeparser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
RESTful resource variant selection using the HTTP Content-Type. | |
""" | |
__version__ = '0.5.3' | |
__author__ = 'Martin Blech <martinblech@gmail.com>' | |
__license__ = 'MIT' | |
import mimeparse | |
from functools import wraps | |
import re | |
class MimeParserException(Exception): pass | |
XML = 'xml' | |
JSON = 'json' | |
JSONP = 'jsonp' | |
BSON = 'bson' | |
YAML = 'yaml' | |
XHTML = 'xhtml' | |
HTML = 'html' | |
TXT = 'txt' | |
CSV = 'csv' | |
TSV = 'tsv' | |
RSS = 'rss' | |
RDF = 'rdf' | |
ATOM = 'atom' | |
M3U = 'm3u' | |
PLS = 'pls' | |
XSPF = 'xspf' | |
ICAL = 'ical' | |
KML = 'kml' | |
KMZ = 'kmz' | |
_MIME_TYPES = { | |
XML: ('application/xml', 'text/xml', 'application/x-xml',), | |
JSON: ('application/json',), | |
JSONP: ('application/javascript',), | |
BSON: ('application/bson',), | |
YAML: ('application/x-yaml', 'text/yaml',), | |
XHTML: ('application/xhtml+xml',), | |
HTML: ('text/html',), | |
TXT: ('text/plain',), | |
CSV: ('text/csv',), | |
TSV: ('text/tab-separated-values',), | |
RSS: ('application/rss+xml',), | |
RDF: ('application/rdf+xml',), | |
ATOM: ('application/atom+xml',), | |
M3U: ('audio/x-mpegurl', 'application/x-winamp-playlist', 'audio/mpeg-url', 'audio/mpegurl',), | |
PLS: ('audio/x-scpls',), | |
XSPF: ('application/xspf+xml',), | |
ICAL: ('text/calendar',), | |
KML: ('application/vnd.google-earth.kml+xml',), | |
KMZ: ('application/vnd.google-earth.kmz',), | |
} | |
def register_mime(shortname, mime_types): | |
""" | |
Register a new mime type. | |
Usage example: | |
mimeparser.register_mime('svg', ('application/x-svg', 'application/svg+xml',)) | |
After this you can do: | |
@mimeparser.mimeparser(svg=render_svg) | |
def GET(... | |
... | |
""" | |
if shortname in _MIME_TYPES: | |
raise MimeParserException('"%s" has already been registered'%shortname) | |
_MIME_TYPES[shortname] = mime_types | |
def _get_mime_types(shortname): | |
try: | |
return _MIME_TYPES[shortname] | |
except KeyError: | |
raise MimeParserException('No known mime types for "%s"'%shortname) | |
def _get_short_mime(mime): | |
for shortmime, mimes in _MIME_TYPES.items(): | |
if mime in mimes: | |
return shortmime | |
raise MimeParserException('No short mime for type "%s"' % mime) | |
def _best_mime(supported, accept_string=None): | |
if accept_string is None: | |
return None | |
return mimeparse.best_match(supported, accept_string) | |
VARY_SEPARATOR = re.compile(r',\s*') | |
def _fix_headers(headers, content_type): | |
fixed_headers = [] | |
found_vary = False | |
found_content_type = False | |
for k, v in headers: | |
if k.lower() == 'vary': | |
found_vary = True | |
if 'accept' not in VARY_SEPARATOR.split(v.strip().lower()): | |
v = v + ',Accept' | |
if k.lower() == 'content-type': | |
found_content_type = True | |
fixed_headers.append((k, v)) | |
if not found_vary: | |
fixed_headers.append(('Vary', 'Accept')) | |
if not found_content_type: | |
fixed_headers.append(('Content-Type', content_type)) | |
return fixed_headers | |
class MimeParserBase(object): | |
def __init__(self, global_default=None, global_override_arg_idx=None, | |
global_override_input_key=None, global_charset=None, | |
global_unsupported_media_type_callback=None): | |
self.global_default = global_default | |
self.global_override_arg_idx = global_override_arg_idx | |
self.global_override_input_key = global_override_input_key | |
self.global_charset = global_charset | |
self.global_unsupported_media_type_callback = global_unsupported_media_type_callback | |
def __call__(self, default=None, override_arg_idx=None, | |
override_input_key=None, charset=None, | |
unsupported_media_type_callback=None, | |
**parsers): | |
""" | |
Main mimeparser decorator. Usage:: | |
@mimeparser(default='xml', override_arg_idx=-1, override_input_key='format', , <parsers>) | |
GET(self, ...) (or POST, etc.) | |
The decorated function must return a dict with the objects necessary to | |
render the final result to the user. The selected parser will be | |
called with the dict contents as keyword arguments. | |
If override_arg_idx isn't None, the wrapped function's positional | |
argument at that index will be used instead of the Accept header. | |
override_input_key works the same way, but with web.input(). | |
Example:: | |
@mimeparser( | |
default = 'xml', | |
override_arg_idx = -1, | |
override_input_key = 'format', | |
xhtml = xhtml_templates.greet, | |
html = xhtml_templates.greet, | |
xml = xml_templates.greet, | |
json = json_parser, | |
yaml = json_parser, | |
txt = json_parser, | |
) | |
def greet(self, param): | |
message = 'Hello, %s!'%param | |
return {'message':message} | |
""" | |
if not parsers: | |
raise ValueError('need at least one parser') | |
def get_parser(mime): | |
try: | |
return parser_dict[mime] | |
except KeyError: | |
raise MimeParserException('No parser for mime "%s"'%mime) | |
if not default: default = self.global_default | |
if not override_arg_idx: | |
override_arg_idx = self.global_override_arg_idx | |
if not override_input_key: | |
override_input_key = self.global_override_input_key | |
if not charset: charset = self.global_charset | |
if not unsupported_media_type_callback: | |
unsupported_media_type_callback = self.global_unsupported_media_type_callback | |
supported = list() | |
parser_dict = dict() | |
for shortname, parser in parsers.items(): | |
for mime in _get_mime_types(shortname): | |
supported.append(mime) | |
parser_dict[mime] = parser | |
if default: | |
default_mimes = _get_mime_types(default) | |
# default mime types should be last in the supported list | |
# (which means highest priority to mimeparse) | |
for mime in reversed(default_mimes): | |
supported.remove(mime) | |
supported.append(mime) | |
default_mime = default_mimes[0] | |
default_parser = get_parser(default_mime) | |
else: | |
default_mime, default_parser = next(iter(parser_dict.items())) | |
def wrap(target): | |
@wraps(target) | |
def wrapper(*args, **kwargs): | |
mime = None | |
shortmime = None | |
if override_arg_idx != None: | |
shortmime = args[override_arg_idx] | |
if not shortmime and override_input_key: | |
shortmime = self._get_request_parameter(override_input_key) | |
if shortmime: mime = _get_mime_types(shortmime)[0] | |
content_type_header = self._get_content_type_header() | |
if not mime: | |
if content_type_header: | |
mime = _best_mime(supported, content_type_header) | |
else: | |
mime = default_mime | |
if mime: | |
parser = get_parser(mime) | |
else: | |
if unsupported_media_type_callback: | |
content_type, entity = unsupported_media_type_callback( | |
content_type_header, supported) | |
return self._make_response(entity, | |
(('Content-Type', | |
content_type),), | |
'415 Unsupported Media Type') | |
else: | |
mime, parser = default_mime, default_parser | |
if not shortmime: shortmime = _get_short_mime(mime) | |
try: | |
old_data = self._get_data() | |
new_data = parser(old_data) | |
self._set_data(new_data) | |
except BaseException as e: | |
raise MimeParserException('Parsing error: %s' % (str(e))) | |
context_vars = dict( | |
mimeparser_shortmime=shortmime, | |
mimeparser_mime=mime, | |
mimeparser_parser=parser) | |
for key, value in context_vars.items(): | |
self._set_context_var(key, value) | |
try: | |
result = target(*args, **kwargs) | |
finally: | |
for key in context_vars.keys(): | |
self._clear_context_var(key) | |
self._set_data(old_data) | |
return result | |
return wrapper | |
return wrap | |
def _get_request_parameter(self, key, default=None): | |
return default | |
def _get_accept_header(self, default=None): | |
return default | |
def _set_context_var(self, key, value): | |
pass | |
def _clear_context_var(self, key): | |
pass | |
def _get_data(self): | |
return "" | |
def _set_data(self, data): | |
pass | |
def _make_response(self, content, headers, status): | |
return content | |
# web.py implementation | |
try: | |
import web | |
class WebPyMimeParser(MimeParserBase): | |
def _get_request_parameter(self, key, default=None): | |
return web.input().get(key, default) | |
def _get_content_type_header(self, default=None): | |
return web.ctx.env.get('CONTENT_TYPE', default) | |
def _set_context_var(self, key, value): | |
web.ctx[key] = value | |
def _clear_context_var(self, key): | |
del web.ctx[key] | |
def _get_data(self): | |
return web.data() | |
def _set_data(self, data): | |
web.ctx.data = data | |
def _make_response(self, content, headers, status): | |
web.ctx.status = status | |
for k, v in headers: | |
web.header(k, v) | |
return content | |
except ImportError: | |
pass | |
# unit tests | |
if __name__ == "__main__": | |
try: | |
import unittest2 as unittest | |
except ImportError: | |
import unittest | |
class TestMimeParser(MimeParserBase): | |
def __init__(self, data=None, content_type=None, request_parameters=None, | |
*args, **kwargs): | |
super(TestMimeParser, self).__init__(*args, **kwargs) | |
self.request_parameters = request_parameters or {} | |
self.content_type_header = content_type | |
self.ctx = {} | |
self.headers = {} | |
self.data = data | |
def _get_request_parameter(self, key, default=None): | |
return self.request_parameters.get(key, default) | |
def _get_content_type_header(self, default=None): | |
return self.content_type_header | |
def _set_context_var(self, key, value): | |
self.ctx[key] = value | |
def _clear_context_var(self, key): | |
del self.ctx[key] | |
def _get_data(self): | |
return self.data | |
def _set_data(self, data): | |
self.data = data | |
def _make_response(self, content, headers, status): | |
self.status = status | |
for k, v in headers: | |
self.headers[k] = v | |
return content | |
class MimeParserTests(unittest.TestCase): | |
def test_single_variant(self): | |
mimeparser = TestMimeParser('test', 'text/xml') | |
result = mimeparser( | |
xml=lambda x: '<xml>%s</xml>' % x | |
)(lambda: dict())() | |
self.assertEqual(mimeparser.data, '<xml>test</xml>') | |
def test_noparserers(self): | |
try: | |
TestMimeParser()() | |
self.fail('should fail with ValueError') | |
except ValueError: | |
pass | |
def test_select_variant(self): | |
mimeparser = TestMimeParser() | |
handler = mimeparser( | |
default='txt', | |
override_input_key='mime', | |
txt=lambda x: 'txt:%s' %x, | |
xml=lambda x: 'xml:%s' % x, | |
json=lambda x: 'json:%s' % x, | |
html=lambda x: 'html:%s' % x, | |
)(lambda x: dict(x=x)) | |
mimeparser.data = 'default' | |
result = handler(None) | |
self.assertEqual(mimeparser.data, 'txt:default') | |
mimeparser.data = 'a' | |
mimeparser.content_type_header = 'application/xml' | |
result = handler(None) | |
self.assertEqual(mimeparser.data, 'xml:a') | |
mimeparser.data = 'b' | |
mimeparser.content_type_header = 'application/json' | |
result = handler(None) | |
self.assertEqual(mimeparser.data, 'json:b') | |
mimeparser.data = 'c' | |
mimeparser.request_parameters['mime'] = 'html' | |
result = handler(None) | |
self.assertEqual(mimeparser.data, 'html:c') | |
def test_decorated_function_name(self): | |
def vanilla_function(): pass | |
mimeparser = TestMimeParser() | |
decorated_function = mimeparser(xml=None)(vanilla_function) | |
self.assertEqual(vanilla_function.__name__, | |
decorated_function.__name__) | |
def test_unsupported_media_type(self): | |
mimeparser = TestMimeParser() | |
# default behavior, pick default even if not acceptable | |
handler = mimeparser( | |
default='json', | |
xml=lambda x: 'xml:%s' %x, | |
json=lambda x: 'json:%s' %x, | |
)(lambda x: dict(x=x)) | |
mimeparser.content_type_header = 'text/plain' | |
mimeparser.data = 'default' | |
result = handler(None) | |
self.assertEqual(mimeparser.data, 'json:default') | |
# optional: fail with 406 | |
handler = mimeparser( | |
unsupported_media_type_callback= lambda _, sup: ( | |
'text/plain', | |
'Available Content Types: ' + ', '.join(sup)), | |
default='json', | |
xml=lambda x: 'xml:%s' %x, | |
json=lambda x: 'json:%s' %x, | |
)(lambda x: dict(x=x)) | |
mimeparser.content_type_header = 'text/plain' | |
mimeparser.data = 'default' | |
result = handler(None) | |
self.assertEqual(mimeparser.status, '415 Unsupported Media Type') | |
self.assertTrue(result.startswith('Available Content Types: ')) | |
self.assertTrue(result.find('application/xml') != -1) | |
self.assertTrue(result.find('application/json') != -1) | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment