Skip to content

Instantly share code, notes, and snippets.

@almet almet/
Created Jul 7, 2010

What would you like to do?
Provides the PyPIDistribution class thats represents a distribution retrieved
on PyPI.
import re
import urlparse
import urllib
import tempfile
from operator import attrgetter
import hashlib
except ImportError:
from distutils2._backport import hashlib
from distutils2.version import suggest_normalized_version, NormalizedVersion
from distutils2.pypi.errors import HashDoesNotMatch, UnsupportedHashName
EXTENSIONS = ".tar.gz .tar.bz2 .tar .zip .tgz .egg".split()
MD5_HASH = re.compile(r'^.*#md5=([a-f0-9]+)$')
class PyPIDistribution(object):
"""Represents a distribution retrieved from PyPI.
This is a simple container for various attributes as name, version,
downloaded_location, url etc.
The PyPIDistribution class is used by the pypi.*Index class to return
information about distributions.
def from_url(cls, url, probable_dist_name=None, is_external=True):
"""Build a Distribution from a url archive (egg or zip or tgz).
:param url: complete url of the distribution
:param probable_dist_name: A probable name of the distribution.
:param is_external: Tell if the url commes from an index or from
an external URL.
# if the url contains a md5 hash, get it.
md5_hash = None
match = MD5_HASH.match(url)
if match is not None:
md5_hash =
# remove the hash
url = url.replace("#md5=%s" % md5_hash, "")
# parse the archive name to find dist name and version
archive_name = urlparse.urlparse(url)[2].split('/')[-1]
extension_matched = False
# remove the extension from the name
for ext in EXTENSIONS:
if archive_name.endswith(ext):
archive_name = archive_name[:-len(ext)]
extension_matched = True
name, version = split_archive_name(archive_name)
if extension_matched is True:
return PyPIDistribution(name, version, url=url, url_hashname="md5",
def __init__(self, name, version, type=None, url=None, url_hashname=None,
url_hashval=None, url_is_external=True):
"""Create a new instance of PyPIDistribution.
:param name: the name of the distribution
:param version: the version of the distribution
:param type: the type of the dist (eg. source, bin-*, etc.)
:param url: URL where we found this distribution
:param url_hashname: the name of the hash we want to use. Refer to the documentation for more information.
:param url_hashval: the hash value.
:param url_is_external: we need to know if the provided url comes from an
index browsing, or from an external resource.
""" = name
self.version = NormalizedVersion(version)
self.type = type
# set the downloaded path to None by default. The goal here
# is to not download distributions multiple times
self.downloaded_location = None
# We store urls in dict, because we need to have a bit more informations
# than the simple URL. It will be used later to find the good url to
# use.
# We have two _url* attributes: _url and _urls. _urls contains a list of
# dict for the different urls, and _url contains the choosen url, in
# order to dont make the selection process multiple times.
self._urls = []
self._url = None
self.add_url(url, url_hashname, url_hashval, url_is_external)
def add_url(self, url, hashname=None, hashval=None, is_external=True):
"""Add a new url to the list of urls"""
if hashname is not None:
except ValueError:
raise UnsupportedHashName(hashname)
'url': url,
'hashname': hashname,
'hashval': hashval,
'is_external': is_external,
# reset the url selection process
self._url = None
def url(self):
"""Pick up the right url for the list of urls in self.urls"""
# We return internal urls over externals.
# If there is more than one internal or external, return the first
# one.
if self._url is None:
if len(self._urls) > 1:
internals_urls = [u for u in self._urls \
if u['is_external'] == False]
if len(internals_urls) >= 1:
self._url = internals_urls[0]
if self._url is None:
self._url = self._urls[0]
return self._url
def is_source(self):
"""return if the distribution is a source one or not"""
return self.type == 'source'
def is_final(self):
"""proxy to version.is_final"""
return self.version.is_final
def download(self, path=None):
"""Download the distribution to a path, and return it.
If the path is given in path, use this, otherwise, generates a new one
if path is None:
path = tempfile.mkdtemp()
# if we do not have downloaded it yet, do it.
if self.downloaded_location is None:
url = self.url['url']
archive_name = urlparse.urlparse(url)[2].split('/')[-1]
filename, headers = urllib.urlretrieve(url,
path + "/" + archive_name)
self.downloaded_location = filename
return self.downloaded_location
def _check_md5(self, filename):
"""Check that the md5 checksum of the given file matches the one in
url param"""
hashname = self.url['hashname']
expected_hashval = self.url['hashval']
if not None in (expected_hashval, hashname):
f = open(filename)
hashval =
if hashval.hexdigest() != expected_hashval:
raise HashDoesNotMatch("got %s instead of %s"
% (hashval.hexdigest(), expected_hashval))
def __repr__(self):
return "%s %s %s %s" \
% (self.__class__.__name__,, self.version,
self.type or "")
def _check_is_comparable(self, other):
if not isinstance(other, PyPIDistribution):
raise TypeError("cannot compare %s and %s"
% (type(self).__name__, type(other).__name__))
elif !=
raise TypeError("cannot compare %s and %s"
% (,
def __eq__(self, other):
return self.version == other.version
def __lt__(self, other):
return self.version < other.version
def __ne__(self, other):
return not self.__eq__(other)
def __gt__(self, other):
return not (self.__lt__(other) or self.__eq__(other))
def __le__(self, other):
return self.__eq__(other) or self.__lt__(other)
def __ge__(self, other):
return self.__eq__(other) or self.__gt__(other)
# See
__hash__ = object.__hash__
class PyPIDistributions(list):
"""A container of PyPIDistribution objects.
Contains methods and facilities to sort and filter distributions.
def __init__(self, list=[]):
# To disable the ability to pass lists on instanciation
super(PyPIDistributions, self).__init__()
for item in list:
def filter(self, predicate):
"""Filter the distributions and return a subset of distributions that
match the given predicate
return PyPIDistributions(
[dist for dist in self if == and
def get_last(self, predicate, prefer_source=None, prefer_final=None):
"""Return the most up to date version, that satisfy the given
distributions = self.filter(predicate)
distributions.sort_distributions(prefer_source, prefer_final, reverse=True)
return distributions[0]
def get_same_name_and_version(self):
"""Return lists of PyPIDistribution objects that refer to the same
name and version number. This do not consider the type (source, binary,
processed = []
duplicates = []
for dist in self:
if (, dist.version) not in processed:
processed.append((, dist.version))
found_duplicates = [d for d in self if == and
d.version == dist.version]
if len(found_duplicates) > 1:
return duplicates
def append(self, o):
"""Append a new distribution to the list.
If a distribution with the same name and version exists, just grab the
URL informations and add a new new url for the existing one.
similar_dists = [d for d in self if == and
d.version == o.version and d.type == o.type]
if len(similar_dists) > 0:
dist = similar_dists[0]
super(PyPIDistributions, self).append(o)
def sort_distributions(self, prefer_source=True, prefer_final=False,
reverse=True, *args, **kwargs):
"""order the results with the given properties"""
sort_by = []
if prefer_final:
if prefer_source:
super(PyPIDistributions, self).sort(
key=lambda i: [getattr(i, arg) for arg in sort_by],
reverse=reverse, *args, **kwargs)
def split_archive_name(archive_name, probable_name=None):
"""Split an archive name into two parts: name and version.
Return the tuple (name, version)
# Try to determine wich part is the name and wich is the version using the
# "-" separator. Take the larger part to be the version number then reduce
# if this not works.
def eager_split(str, maxsplit=2):
# split using the "-" separator
splits = str.rsplit("-", maxsplit)
name = splits[0]
version = "-".join(splits[1:])
if version.startswith("-"):
version = version[1:]
if suggest_normalized_version(version) is None and maxsplit >= 0:
# we dont get a good version number: recurse !
return eager_split(str, maxsplit - 1)
return (name, version)
if probable_name is not None:
probable_name = probable_name.lower()
name = None
if probable_name is not None and probable_name in archive_name:
# we get the name from probable_name, if given.
name = probable_name
version = archive_name.lstrip(name)
name, version = eager_split(archive_name)
version = suggest_normalized_version(version)
if version != "" and name != "":
return (name.lower(), version)
raise CantParseArchiveName(archive_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.