Skip to content

Instantly share code, notes, and snippets.

@atr000
Forked from zacharyvoase/.gitignore
Created July 20, 2009 22:19
Show Gist options
  • Save atr000/150959 to your computer and use it in GitHub Desktop.
Save atr000/150959 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# urlobject.py - A utility class for operating on URLs.
"""
urlobject.py - A utility class for operating on URLs.
Example Usage::
>>> from urlobject import URLObject
>>> url = URLObject(host='example.com')
>>> print url
http://example.com/
>>> print url / 'some' / 'path'
http://example.com/some/path
>>> print url & ('key', 'value')
http://example.com/?key=value
>>> print url & ('key', 'value') & ('key2', 'value2')
http://example.com/?key=value&key2=value2
>>> print url * 'fragment'
http://example.com/#fragment
>>> print url / u'\N{LATIN SMALL LETTER N WITH TILDE}'
http://example.com/%C3%B1
>>> url
<URLObject(u'http://example.com/') at 0x...>
>>> new_url = url / 'place'
>>> new_url
<URLObject(u'http://example.com/place') at 0x...>
>>> new_url &= 'key', 'value'
>>> new_url
<URLObject(u'http://example.com/place?key=value') at 0x...>
>>> new_url &= 'key2', 'value2'
>>> new_url
<URLObject(u'http://example.com/place?key=value&key2=value2') at 0x...>
>>> new_url |= 'key', 'newvalue'
>>> new_url
<URLObject(u'http://example.com/place?key2=value2&key=newvalue') at 0x...>
Important points to note:
* URLObjects are completely unicode-aware (they subclass ``unicode``). This
also means that international hostnames will be encoded to IDNA format,
and international characters in pathnames will be automatically escaped.
You should continue using unicode values for everything; the various
components will be en/decoded on-the-fly.
* ``url & (key, value)`` adds ``key=value`` to URL, even if ``key`` is
already present as a query parameter. This allows you to have multiple
appearances of ``key`` in the query.
* ``url | (key, value)`` adds ``key=value`` to URL, removing any previous
appearance of ``key`` in the query parameters.
* ``url & dictionary`` and ``url | dictionary`` work similarly to their
``(key, value)`` counterparts, only they add every key, value pair in the
dictionary to the query string. You can also pass in a list of key, value
pairs.
* ``url / 'path'`` adds ``'path'`` to the current path, quoting special
characters if necessary.
* ``url // 'path'`` sets the path to ``'path'``, removing the current path
if present.
* ``url * 'fragment'`` sets the fragment to ``'fragment'``.
* ``url ^ 123`` sets the port number to ``123``.
* ``url.with_*(value)`` can be done with scheme, host, port, path, query and
fragment, returning a new URL object with the value in that place.
* ``url.without_port()``, ``url.without_path()``, ``url.without_query()``
and ``url.without_fragment()`` all exist and do something obvious.
* Operations return a *new* URL object (URL objects are immutable).
Hints and tips:
* If a URL's scheme is ``'http'`` and you try to set the port to 80, it is
equivalent to not specifying the port (same goes for ``'https'``,
``'ftp'`` and ``'ftps'`` for their appropriate ports).
* If you need to end the path with ``'/'``, you can do either ``url / ''``
or ``url / 'last_component/'``.
* The query parameters are available as a list through the ``query_list()``
method and as a dictionary via ``query_dict()``. By default, the latter
method will return a dictionary with lists as the values, corresponding to
potential multiple occurrences of the same key. You can just take the last
value by passing the ``seq=False`` keyword argument to the method.
"""
from functools import partial
import cgi
import copy
import urllib
import urlparse
URL_COMPONENTS = ('scheme', 'host', 'path', 'query', 'fragment')
SCHEME_PORT_MAP = {
'http': 80,
'https': 443,
'ftp': 21,
'ftps': 990,
}
class URLObject(unicode):
def __new__(cls, host='', path='/', scheme='http', query=None, fragment=''):
if not isinstance(query, basestring):
query = encode_query(query or {}, doseq=True)
return unicode.__new__(cls,
urlparse.urlunsplit((
encode_component(scheme),
encode_component(host.encode('idna')),
encode_component(path),
query,
encode_component(fragment)
)))
@classmethod
def parse(cls, url):
return unicode.__new__(cls, url)
# Support for urlobj.scheme, urlobj.host, urlobj.path, etc.
for i, attr in enumerate(URL_COMPONENTS):
vars()[attr] = (
lambda index:
property(lambda self: decode_component(
urlparse.urlsplit(self)[index])))(i)
vars()['with_' + attr] = (
lambda param:
lambda self, value: self.copy(**{param: value}))(attr)
# Supports without_path(), without_query() and without_fragment().
for i, attr in enumerate(URL_COMPONENTS[2:]):
vars()['without_' + attr] = (
lambda param:
lambda self: self.copy(**{param: ''}))(attr)
def components(self):
return dict(zip(URL_COMPONENTS,
map(partial(getattr, self), URL_COMPONENTS)))
def copy(self, **kwargs):
components = self.components()
components.update(kwargs)
return type(self)(**components)
## Scheme-related methods.
def secure(self):
return self.with_scheme(self.scheme + 's')
## Host-related methods.
@property
def host(self):
return decode_component(urlparse.urlsplit(self)[1]).decode('idna')
def with_host(self, host):
return self.copy(host=host)
## Port-related properties and methods.
@property
def port(self):
host, port = urllib.splitnport(self.host, defport=None)
if (self.scheme in SCHEME_PORT_MAP) and (not port):
return SCHEME_PORT_MAP[self.scheme]
return port
def with_port(self, port):
if self.scheme in SCHEME_PORT_MAP:
if SCHEME_PORT_MAP[self.scheme] == port:
return self.without_port()
host, _ = urllib.splitport(self.host)
return self.with_host(host + ':' + str(port))
def without_port(self):
return self.copy(host=urllib.splitport(self.host)[0])
## Query-related methods.
# Overrides the automatically-defined one.
@property
def query(self):
return urlparse.urlsplit(self)[3]
def query_list(self):
return decode_query(self.query)
def query_dict(self, seq=True):
if seq:
decoded = decode_query(self.query)
query_dict = {}
for key, value in decoded:
query_dict.setdefault(key, []).append(value)
return query_dict
return dict(decode_query(self.query))
def add_query_param(self, key, value):
new_query = decode_query(self.query)
new_query.append((key, ensure_unicode(value)))
return self.with_query(new_query)
def set_query_param(self, key, value):
old_query = cgi.parse_qsl(self.query)
new_query = []
for old_key, old_value in old_query:
if old_key != key:
new_query.append((old_key, old_value))
new_query.append((key, ensure_unicode(value)))
return self.with_query(new_query)
## Path-related methods.
def path_list(self):
return filter(None, self.path.split('/'))
def add_path_component(self, path):
if path.startswith('/'):
new_path = path
elif self.path.endswith('/'):
new_path = self.path + path
else:
new_path = self.path + '/' + path
return self.with_path(new_path)
def parent(self):
try:
parent_path = self.path[:self.path.rindex('/')]
except IndexError:
parent_path = '/'
return self.with_path(parent_path)
def root(self):
return self.with_path('/')
## Additional magic methods.
def __repr__(self):
return '<URLObject(%r) at 0x%x>' % (unicode(self), id(self))
def __and__(self, query_param):
if hasattr(query_param, 'items'):
new = self
for qp in query_param.items():
new = new.add_query_param(*qp)
return new
else:
return self.add_query_param(*query_param)
def __or__(self, query_param):
if hasattr(query_param, 'items'):
new = self
for qp in query_param.items():
new = new.set_query_param(*qp)
return new
else:
return self.set_query_param(*query_param)
__div__ = add_path_component
__floordiv__ = with_path
__mul__ = with_fragment
__xor__ = with_port
## Functions to help with escaping international URLs.
URL_ESCAPE_RANGES = [
(0xA0, 0xD7FF),
(0xE000, 0xF8FF),
(0xF900, 0xFDCF),
(0xFDF0, 0xFFEF),
(0x10000, 0x1FFFD),
(0x20000, 0x2FFFD),
(0x30000, 0x3FFFD),
(0x40000, 0x4FFFD),
(0x50000, 0x5FFFD),
(0x60000, 0x6FFFD),
(0x70000, 0x7FFFD),
(0x80000, 0x8FFFD),
(0x90000, 0x9FFFD),
(0xA0000, 0xAFFFD),
(0xB0000, 0xBFFFD),
(0xC0000, 0xCFFFD),
(0xD0000, 0xDFFFD),
(0xE1000, 0xEFFFD),
(0xF0000, 0xFFFFD),
(0x100000, 0x10FFFD)
]
def ensure_unicode(obj):
if isinstance(obj, unicode):
return obj
elif isinstance(obj, str):
return obj.decode('utf-8')
return unicode(obj)
def encode_component(component):
if isinstance(component, unicode):
encoded_list = []
for unichar in component:
if any(low <= ord(unichar) <= high for low, high in URL_ESCAPE_RANGES):
encoded_list.append(urllib.quote(unichar.encode('utf-8')))
elif ord(unichar) < 128:
encoded_list.append(urllib.quote(str(unichar)))
else:
encoded_list.append(unichar)
return ''.join(encoded_list)
return urllib.quote(component)
def decode_component(component):
return urllib.unquote(str(component)).decode('utf-8')
def encode_query(params, doseq=False):
if hasattr(params, 'items'):
params = params.items()
if doseq:
params = transform_doseq(params)
return '&'.join('='.join(map(encode_component, param)) for param in params)
def transform_doseq(items):
new_items = []
for key, value in items:
if hasattr(value, '__iter__') and not isinstance(value, basestring):
for subvalue in value:
new_items.append((key, subvalue))
else:
new_items.append((key, value))
return new_items
def decode_query(query):
return [(key.decode('utf-8'), value.decode('utf-8'))
for key, value in cgi.parse_qsl(str(query))]
if __name__ == '__main__':
import doctest
doctest.testmod(optionflags=doctest.ELLIPSIS)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment