Skip to content

Instantly share code, notes, and snippets.

@harlowja
Created January 22, 2015 02:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harlowja/5e39ec5ca9e3f0d9a21f to your computer and use it in GitHub Desktop.
Save harlowja/5e39ec5ca9e3f0d9a21f to your computer and use it in GitHub Desktop.
Pippin.py
import collections
import itertools
import os
from distutils import version as dist_version
from pip import req as pip_req
import pkg_resources
import argparse
import requests
import six
from six.moves import urllib
try:
from pip import util as pip_util
except ImportError:
# pip >=6 changed this location for some reason...
from pip import utils as pip_util
_FINDER_URL_TPL = 'http://pypi.python.org/pypi/%s/json'
class RequirementException(Exception):
pass
class NotFoundException(Exception):
pass
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"-r", "--requirement",
dest="requirements",
nargs="*",
default=[],
metavar="<file>",
help="Analyze all the packages listed in the given requirements file")
return parser
def parse_requirements(options):
def req_key(req):
return req.req.key
all_requirements = {}
for filename in options.requirements:
try:
for req in pip_req.parse_requirements(filename):
all_requirements.setdefault(req_key(req), []).append(req)
except Exception as ex:
raise RequirementException("Cannot parse `%s': %s" % (filename, ex))
return all_requirements
def find_versions(pkg_name):
def sorter(r1, r2):
return cmp(r1[1], r2[1])
url = _FINDER_URL_TPL % (urllib.parse.quote(pkg_name))
resp = requests.get(url)
resp_data = resp.json()
releases = []
for v, release_infos in six.iteritems(resp_data['releases']):
rel = None
for r in release_infos:
if r['packagetype'] == 'sdist':
rel = r['url']
if rel is None:
raise NotFoundException("No sdist url found for package"
" %s v%s" % (pkg_name, v))
releases.append((str(v), dist_version.StrictVersion(v),
pkg_resources.Requirement.parse(v), rel))
return sorted(releases, cmp=sorter)
def dump_requirements(requirements):
for k in six.iterkeys(requirements):
k_restrictions = []
if isinstance(requirements[k], (list, tuple)):
for r in requirements[k]:
if r.req.specs:
k_restrictions.extend(["".join(s) for s in r.req.specs])
else:
r = requirements[k]
k_restrictions.extend(["".join(s) for s in r.req.specs])
if k_restrictions:
print("- %s %s" % (k, k_restrictions))
else:
print("- %s" % (k))
def match_available(req, available):
looked_in = []
for a in reversed(available):
v = a[0]
if v in req:
line = "%s==%s" % (req.key, v)
m_req = pip_req.InstallRequirement.from_line(line)
return m_req
else:
looked_in.append(v)
raise NotFoundException("No requirement found that"
" matches '%s' (tried %s)" % (req, looked_in))
def find_match(pkg_name, req):
return match_available(req.req, find_versions(pkg_name))
def probe(requirements, gathered):
if not requirements:
return {}
requirements = requirements.copy()
gathered = gathered.copy()
# Pick one of the requirements, get a version that works with the current
# known siblings (other requirements that are requested along side this
# requirement) and then recurse trying to get another requirement that
# will work, if this is not possible, backtrack and try a different
# version instead (and repeat)...
pkg_name, pkg_requirements = requirements.popitem()
for req in pkg_requirements:
local_compat = False
try:
m = find_match(pkg_name, req)
except RequirementException:
pass
else:
# TODO: Check if compatible with what we already gathered...
local_compat = True
if local_compat:
# TODO: download the selected version, add all of its requirements
# into the currently being worked on requirements dictionary,
# and then continue...
#
# Freeze at this version and continue probing other packages...
gathered[pkg_name] = m
try:
result = probe(requirements, gathered)
except RequirementException:
gathered.pop(pkg_name)
else:
gathered.update(result)
return gathered
pkg_requirements = [req.req for req in pkg_requirements]
raise RequirementException("No requirement found that"
" matches '%s'" % (pkg_requirements))
def main():
parser = create_parser()
options = parser.parse_args()
initial = parse_requirements(options)
print("Initial package set:")
dump_requirements(initial)
# TODO: use these to download (via a request to pip) the dependencies we
# find that might work, so that we can then extract there desired/needed
# requirements and then continue probing....
for d in ['.cache', '.build', '.download']:
if not os.path.isdir(os.path.join(os.getcwd(), d)):
os.makedirs(os.path.join(os.getcwd(), d))
matches = probe(initial, {})
print("Deep package set:")
dump_requirements(matches)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment