Skip to content

Instantly share code, notes, and snippets.

@diorcety
Created November 14, 2013 19:46
Show Gist options
  • Save diorcety/7473146 to your computer and use it in GitHub Desktop.
Save diorcety/7473146 to your computer and use it in GitHub Desktop.
Mimeparser
"""
RESTful resource variant selection using the HTTP Content-Type.
"""
__version__ = '0.5.3'
__author__ = 'Martin Blech <martinblech@gmail.com>'
__license__ = 'MIT'
import mimeparse
from functools import wraps
import re
class MimeParserException(Exception): pass
XML = 'xml'
JSON = 'json'
JSONP = 'jsonp'
BSON = 'bson'
YAML = 'yaml'
XHTML = 'xhtml'
HTML = 'html'
TXT = 'txt'
CSV = 'csv'
TSV = 'tsv'
RSS = 'rss'
RDF = 'rdf'
ATOM = 'atom'
M3U = 'm3u'
PLS = 'pls'
XSPF = 'xspf'
ICAL = 'ical'
KML = 'kml'
KMZ = 'kmz'
_MIME_TYPES = {
XML: ('application/xml', 'text/xml', 'application/x-xml',),
JSON: ('application/json',),
JSONP: ('application/javascript',),
BSON: ('application/bson',),
YAML: ('application/x-yaml', 'text/yaml',),
XHTML: ('application/xhtml+xml',),
HTML: ('text/html',),
TXT: ('text/plain',),
CSV: ('text/csv',),
TSV: ('text/tab-separated-values',),
RSS: ('application/rss+xml',),
RDF: ('application/rdf+xml',),
ATOM: ('application/atom+xml',),
M3U: ('audio/x-mpegurl', 'application/x-winamp-playlist', 'audio/mpeg-url', 'audio/mpegurl',),
PLS: ('audio/x-scpls',),
XSPF: ('application/xspf+xml',),
ICAL: ('text/calendar',),
KML: ('application/vnd.google-earth.kml+xml',),
KMZ: ('application/vnd.google-earth.kmz',),
}
def register_mime(shortname, mime_types):
"""
Register a new mime type.
Usage example:
mimeparser.register_mime('svg', ('application/x-svg', 'application/svg+xml',))
After this you can do:
@mimeparser.mimeparser(svg=render_svg)
def GET(...
...
"""
if shortname in _MIME_TYPES:
raise MimeParserException('"%s" has already been registered'%shortname)
_MIME_TYPES[shortname] = mime_types
def _get_mime_types(shortname):
try:
return _MIME_TYPES[shortname]
except KeyError:
raise MimeParserException('No known mime types for "%s"'%shortname)
def _get_short_mime(mime):
for shortmime, mimes in _MIME_TYPES.items():
if mime in mimes:
return shortmime
raise MimeParserException('No short mime for type "%s"' % mime)
def _best_mime(supported, accept_string=None):
if accept_string is None:
return None
return mimeparse.best_match(supported, accept_string)
VARY_SEPARATOR = re.compile(r',\s*')
def _fix_headers(headers, content_type):
fixed_headers = []
found_vary = False
found_content_type = False
for k, v in headers:
if k.lower() == 'vary':
found_vary = True
if 'accept' not in VARY_SEPARATOR.split(v.strip().lower()):
v = v + ',Accept'
if k.lower() == 'content-type':
found_content_type = True
fixed_headers.append((k, v))
if not found_vary:
fixed_headers.append(('Vary', 'Accept'))
if not found_content_type:
fixed_headers.append(('Content-Type', content_type))
return fixed_headers
class MimeParserBase(object):
def __init__(self, global_default=None, global_override_arg_idx=None,
global_override_input_key=None, global_charset=None,
global_unsupported_media_type_callback=None):
self.global_default = global_default
self.global_override_arg_idx = global_override_arg_idx
self.global_override_input_key = global_override_input_key
self.global_charset = global_charset
self.global_unsupported_media_type_callback = global_unsupported_media_type_callback
def __call__(self, default=None, override_arg_idx=None,
override_input_key=None, charset=None,
unsupported_media_type_callback=None,
**parsers):
"""
Main mimeparser decorator. Usage::
@mimeparser(default='xml', override_arg_idx=-1, override_input_key='format', , <parsers>)
GET(self, ...) (or POST, etc.)
The decorated function must return a dict with the objects necessary to
render the final result to the user. The selected parser will be
called with the dict contents as keyword arguments.
If override_arg_idx isn't None, the wrapped function's positional
argument at that index will be used instead of the Accept header.
override_input_key works the same way, but with web.input().
Example::
@mimeparser(
default = 'xml',
override_arg_idx = -1,
override_input_key = 'format',
xhtml = xhtml_templates.greet,
html = xhtml_templates.greet,
xml = xml_templates.greet,
json = json_parser,
yaml = json_parser,
txt = json_parser,
)
def greet(self, param):
message = 'Hello, %s!'%param
return {'message':message}
"""
if not parsers:
raise ValueError('need at least one parser')
def get_parser(mime):
try:
return parser_dict[mime]
except KeyError:
raise MimeParserException('No parser for mime "%s"'%mime)
if not default: default = self.global_default
if not override_arg_idx:
override_arg_idx = self.global_override_arg_idx
if not override_input_key:
override_input_key = self.global_override_input_key
if not charset: charset = self.global_charset
if not unsupported_media_type_callback:
unsupported_media_type_callback = self.global_unsupported_media_type_callback
supported = list()
parser_dict = dict()
for shortname, parser in parsers.items():
for mime in _get_mime_types(shortname):
supported.append(mime)
parser_dict[mime] = parser
if default:
default_mimes = _get_mime_types(default)
# default mime types should be last in the supported list
# (which means highest priority to mimeparse)
for mime in reversed(default_mimes):
supported.remove(mime)
supported.append(mime)
default_mime = default_mimes[0]
default_parser = get_parser(default_mime)
else:
default_mime, default_parser = next(iter(parser_dict.items()))
def wrap(target):
@wraps(target)
def wrapper(*args, **kwargs):
mime = None
shortmime = None
if override_arg_idx != None:
shortmime = args[override_arg_idx]
if not shortmime and override_input_key:
shortmime = self._get_request_parameter(override_input_key)
if shortmime: mime = _get_mime_types(shortmime)[0]
content_type_header = self._get_content_type_header()
if not mime:
if content_type_header:
mime = _best_mime(supported, content_type_header)
else:
mime = default_mime
if mime:
parser = get_parser(mime)
else:
if unsupported_media_type_callback:
content_type, entity = unsupported_media_type_callback(
content_type_header, supported)
return self._make_response(entity,
(('Content-Type',
content_type),),
'415 Unsupported Media Type')
else:
mime, parser = default_mime, default_parser
if not shortmime: shortmime = _get_short_mime(mime)
try:
old_data = self._get_data()
new_data = parser(old_data)
self._set_data(new_data)
except BaseException as e:
raise MimeParserException('Parsing error: %s' % (str(e)))
context_vars = dict(
mimeparser_shortmime=shortmime,
mimeparser_mime=mime,
mimeparser_parser=parser)
for key, value in context_vars.items():
self._set_context_var(key, value)
try:
result = target(*args, **kwargs)
finally:
for key in context_vars.keys():
self._clear_context_var(key)
self._set_data(old_data)
return result
return wrapper
return wrap
def _get_request_parameter(self, key, default=None):
return default
def _get_accept_header(self, default=None):
return default
def _set_context_var(self, key, value):
pass
def _clear_context_var(self, key):
pass
def _get_data(self):
return ""
def _set_data(self, data):
pass
def _make_response(self, content, headers, status):
return content
# web.py implementation
try:
import web
class WebPyMimeParser(MimeParserBase):
def _get_request_parameter(self, key, default=None):
return web.input().get(key, default)
def _get_content_type_header(self, default=None):
return web.ctx.env.get('CONTENT_TYPE', default)
def _set_context_var(self, key, value):
web.ctx[key] = value
def _clear_context_var(self, key):
del web.ctx[key]
def _get_data(self):
return web.data()
def _set_data(self, data):
web.ctx.data = data
def _make_response(self, content, headers, status):
web.ctx.status = status
for k, v in headers:
web.header(k, v)
return content
except ImportError:
pass
# unit tests
if __name__ == "__main__":
try:
import unittest2 as unittest
except ImportError:
import unittest
class TestMimeParser(MimeParserBase):
def __init__(self, data=None, content_type=None, request_parameters=None,
*args, **kwargs):
super(TestMimeParser, self).__init__(*args, **kwargs)
self.request_parameters = request_parameters or {}
self.content_type_header = content_type
self.ctx = {}
self.headers = {}
self.data = data
def _get_request_parameter(self, key, default=None):
return self.request_parameters.get(key, default)
def _get_content_type_header(self, default=None):
return self.content_type_header
def _set_context_var(self, key, value):
self.ctx[key] = value
def _clear_context_var(self, key):
del self.ctx[key]
def _get_data(self):
return self.data
def _set_data(self, data):
self.data = data
def _make_response(self, content, headers, status):
self.status = status
for k, v in headers:
self.headers[k] = v
return content
class MimeParserTests(unittest.TestCase):
def test_single_variant(self):
mimeparser = TestMimeParser('test', 'text/xml')
result = mimeparser(
xml=lambda x: '<xml>%s</xml>' % x
)(lambda: dict())()
self.assertEqual(mimeparser.data, '<xml>test</xml>')
def test_noparserers(self):
try:
TestMimeParser()()
self.fail('should fail with ValueError')
except ValueError:
pass
def test_select_variant(self):
mimeparser = TestMimeParser()
handler = mimeparser(
default='txt',
override_input_key='mime',
txt=lambda x: 'txt:%s' %x,
xml=lambda x: 'xml:%s' % x,
json=lambda x: 'json:%s' % x,
html=lambda x: 'html:%s' % x,
)(lambda x: dict(x=x))
mimeparser.data = 'default'
result = handler(None)
self.assertEqual(mimeparser.data, 'txt:default')
mimeparser.data = 'a'
mimeparser.content_type_header = 'application/xml'
result = handler(None)
self.assertEqual(mimeparser.data, 'xml:a')
mimeparser.data = 'b'
mimeparser.content_type_header = 'application/json'
result = handler(None)
self.assertEqual(mimeparser.data, 'json:b')
mimeparser.data = 'c'
mimeparser.request_parameters['mime'] = 'html'
result = handler(None)
self.assertEqual(mimeparser.data, 'html:c')
def test_decorated_function_name(self):
def vanilla_function(): pass
mimeparser = TestMimeParser()
decorated_function = mimeparser(xml=None)(vanilla_function)
self.assertEqual(vanilla_function.__name__,
decorated_function.__name__)
def test_unsupported_media_type(self):
mimeparser = TestMimeParser()
# default behavior, pick default even if not acceptable
handler = mimeparser(
default='json',
xml=lambda x: 'xml:%s' %x,
json=lambda x: 'json:%s' %x,
)(lambda x: dict(x=x))
mimeparser.content_type_header = 'text/plain'
mimeparser.data = 'default'
result = handler(None)
self.assertEqual(mimeparser.data, 'json:default')
# optional: fail with 406
handler = mimeparser(
unsupported_media_type_callback= lambda _, sup: (
'text/plain',
'Available Content Types: ' + ', '.join(sup)),
default='json',
xml=lambda x: 'xml:%s' %x,
json=lambda x: 'json:%s' %x,
)(lambda x: dict(x=x))
mimeparser.content_type_header = 'text/plain'
mimeparser.data = 'default'
result = handler(None)
self.assertEqual(mimeparser.status, '415 Unsupported Media Type')
self.assertTrue(result.startswith('Available Content Types: '))
self.assertTrue(result.find('application/xml') != -1)
self.assertTrue(result.find('application/json') != -1)
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment