Skip to content

Instantly share code, notes, and snippets.

@harlowja
Created January 22, 2015 03:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harlowja/555ea019aef4e901897b to your computer and use it in GitHub Desktop.
Save harlowja/555ea019aef4e901897b to your computer and use it in GitHub Desktop.
Pippin.py
import collections
import contextlib
import itertools
import os
import shutil
import tempfile
import copy
from distutils import version as dist_version
from pip import req as pip_req
import pkg_resources
import argparse
import requests
import six
from six.moves import urllib
try:
from pip import util as pip_util
except ImportError:
# pip >=6 changed this location for some reason...
from pip import utils as pip_util
_FINDER_URL_TPL = 'http://pypi.python.org/pypi/%s/json'
_EGGS_DETAILED = {}
class RequirementException(Exception):
pass
class NotFoundException(Exception):
pass
def req_key(req):
return req.req.key
@contextlib.contextmanager
def tempdir(**kwargs):
# This seems like it was only added in python 3.2
# Make it since its useful...
# See: http://bugs.python.org/file12970/tempdir.patch
tdir = tempfile.mkdtemp(**kwargs)
try:
yield tdir
finally:
shutil.rmtree(tdir)
def get_directory_details(path):
if not os.path.isdir(path):
raise IOError("Can not detail non-existent directory %s" % (path))
path = os.path.abspath(path)
cache_key = "d:%s" % (path)
if cache_key in _EGGS_DETAILED:
return _EGGS_DETAILED[cache_key]
req = pip_req.InstallRequirement.from_line(path)
req.source_dir = path
req.run_egg_info()
dependencies = []
for d in req.requirements():
if not d.startswith("-e") and d.find("#"):
d = d.split("#")[0]
d = d.strip()
if d:
dependencies.append(d)
details = {
'req': req.req,
'dependencies': dependencies,
'name': req.name,
'pkg_info': req.pkg_info(),
'dependency_links': req.dependency_links,
'version': req.installed_version,
}
_EGGS_DETAILED[cache_key] = details
return details
def get_archive_details(filename):
if not os.path.isfile(filename):
raise IOError("Can not detail non-existent file %s" % (filename))
cache_key = "f:%s:%s" % (os.path.basename(filename),
os.path.getsize(filename))
if cache_key in _EGGS_DETAILED:
return _EGGS_DETAILED[cache_key]
with tempdir() as t_dir:
t_filename = os.path.join(t_dir, os.path.basename(filename))
shutil.copyfile(filename, t_filename)
extract_to = os.path.join(t_dir, 'build')
os.makedirs(extract_to)
pip_util.unpack_file(t_filename, extract_to, content_type='', link='')
details = get_directory_details(extract_to)
_EGGS_DETAILED[cache_key] = details
return details
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"-r", "--requirement",
dest="requirements",
nargs="*",
default=[],
metavar="<file>",
help="Analyze all the packages listed in the given requirements file")
return parser
def parse_requirements(options):
all_requirements = {}
for filename in options.requirements:
try:
for req in pip_req.parse_requirements(filename):
all_requirements.setdefault(req_key(req), []).append(req)
except Exception as ex:
raise RequirementException("Cannot parse `%s': %s" % (filename, ex))
return all_requirements
def find_versions(pkg_name):
def sorter(r1, r2):
return cmp(r1[1], r2[1])
url = _FINDER_URL_TPL % (urllib.parse.quote(pkg_name))
resp = requests.get(url)
resp_data = resp.json()
releases = []
for v, release_infos in six.iteritems(resp_data['releases']):
rel = None
for r in release_infos:
if r['packagetype'] == 'sdist':
rel = r['url']
if rel is None:
# TODO: does it matter...
continue
try:
releases.append((str(v), dist_version.StrictVersion(v),
pkg_resources.Requirement.parse(v), rel))
except ValueError:
pass
return sorted(releases, cmp=sorter)
def dump_requirements(requirements):
for k in six.iterkeys(requirements):
k_restrictions = []
if isinstance(requirements[k], (list, tuple)):
for r in requirements[k]:
if r.req.specs:
k_restrictions.extend(["".join(s) for s in r.req.specs])
else:
r = requirements[k]
k_restrictions.extend(["".join(s) for s in r.req.specs])
if k_restrictions:
print("- %s %s" % (k, k_restrictions))
else:
print("- %s" % (k))
def match_available(req, available):
def _detail(req, origin_url):
filename = os.path.basename(origin_url)
path = os.path.join(os.getcwd(), '.download', filename)
if not os.path.exists(path):
resp = requests.get(origin_url)
with open(path, 'wb') as fh:
fh.write(resp.content)
print("INFO: Downloaded %s -> %s (%s)" % (origin_url, path,
os.path.getsize(path)))
return get_archive_details(path)
looked_in = []
for a in reversed(available):
v = a[0]
if v in req:
line = "%s==%s" % (req.key, v)
m_req = pip_req.InstallRequirement.from_line(line)
m_req.details = _detail(m_req, a[-1])
return m_req
else:
looked_in.append(v)
raise NotFoundException("No requirement found that"
" matches '%s' (tried %s)" % (req, looked_in))
def find_match(pkg_name, req):
return match_available(req.req, find_versions(pkg_name))
def is_compatible_alongside(req, gathered, requirements):
# FIXME: actually do something here...
return True
def probe(requirements, gathered):
if not requirements:
return {}
requirements = copy.deepcopy(requirements)
gathered = copy.deepcopy(gathered)
# Pick one of the requirements, get a version that works with the current
# known siblings (other requirements that are requested along side this
# requirement) and then recurse trying to get another requirement that
# will work, if this is not possible, backtrack and try a different
# version instead (and repeat)...
pkg_name, pkg_requirements = requirements.popitem()
for req in pkg_requirements:
print("Probing '%s'" % (req.req))
m = find_match(pkg_name, req)
old_requirements = copy.deepcopy(requirements)
if m.details['dependencies']:
for m_dep in m.details['dependencies']:
m_req = pip_req.InstallRequirement.from_line(m_dep)
requirements.setdefault(req_key(m_req), []).append(m_req)
local_compat = is_compatible_alongside(m, gathered, requirements)
if local_compat:
gathered[pkg_name] = m
try:
result = probe(requirements, gathered)
except RequirementException:
gathered.pop(pkg_name)
requirements = old_requirements
else:
gathered.update(result)
return gathered
else:
requirements = old_requirements
pkg_requirements = [req.req for req in pkg_requirements]
raise RequirementException("No requirement found that"
" matches '%s'" % (pkg_requirements))
def main():
parser = create_parser()
options = parser.parse_args()
initial = parse_requirements(options)
print("Initial package set:")
dump_requirements(initial)
# TODO: use these to download (via a request to pip) the dependencies we
# find that might work, so that we can then extract there desired/needed
# requirements and then continue probing....
for d in ['.download']:
if not os.path.isdir(os.path.join(os.getcwd(), d)):
os.makedirs(os.path.join(os.getcwd(), d))
matches = probe(initial, {})
print("Deep package set:")
dump_requirements(matches)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment