Created
January 22, 2015 02:47
-
-
Save harlowja/5e39ec5ca9e3f0d9a21f to your computer and use it in GitHub Desktop.
Pippin.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import itertools | |
import os | |
from distutils import version as dist_version | |
from pip import req as pip_req | |
import pkg_resources | |
import argparse | |
import requests | |
import six | |
from six.moves import urllib | |
try: | |
from pip import util as pip_util | |
except ImportError: | |
# pip >=6 changed this location for some reason... | |
from pip import utils as pip_util | |
_FINDER_URL_TPL = 'http://pypi.python.org/pypi/%s/json' | |
class RequirementException(Exception): | |
pass | |
class NotFoundException(Exception): | |
pass | |
def create_parser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-r", "--requirement", | |
dest="requirements", | |
nargs="*", | |
default=[], | |
metavar="<file>", | |
help="Analyze all the packages listed in the given requirements file") | |
return parser | |
def parse_requirements(options): | |
def req_key(req): | |
return req.req.key | |
all_requirements = {} | |
for filename in options.requirements: | |
try: | |
for req in pip_req.parse_requirements(filename): | |
all_requirements.setdefault(req_key(req), []).append(req) | |
except Exception as ex: | |
raise RequirementException("Cannot parse `%s': %s" % (filename, ex)) | |
return all_requirements | |
def find_versions(pkg_name): | |
def sorter(r1, r2): | |
return cmp(r1[1], r2[1]) | |
url = _FINDER_URL_TPL % (urllib.parse.quote(pkg_name)) | |
resp = requests.get(url) | |
resp_data = resp.json() | |
releases = [] | |
for v, release_infos in six.iteritems(resp_data['releases']): | |
rel = None | |
for r in release_infos: | |
if r['packagetype'] == 'sdist': | |
rel = r['url'] | |
if rel is None: | |
raise NotFoundException("No sdist url found for package" | |
" %s v%s" % (pkg_name, v)) | |
releases.append((str(v), dist_version.StrictVersion(v), | |
pkg_resources.Requirement.parse(v), rel)) | |
return sorted(releases, cmp=sorter) | |
def dump_requirements(requirements): | |
for k in six.iterkeys(requirements): | |
k_restrictions = [] | |
if isinstance(requirements[k], (list, tuple)): | |
for r in requirements[k]: | |
if r.req.specs: | |
k_restrictions.extend(["".join(s) for s in r.req.specs]) | |
else: | |
r = requirements[k] | |
k_restrictions.extend(["".join(s) for s in r.req.specs]) | |
if k_restrictions: | |
print("- %s %s" % (k, k_restrictions)) | |
else: | |
print("- %s" % (k)) | |
def match_available(req, available): | |
looked_in = [] | |
for a in reversed(available): | |
v = a[0] | |
if v in req: | |
line = "%s==%s" % (req.key, v) | |
m_req = pip_req.InstallRequirement.from_line(line) | |
return m_req | |
else: | |
looked_in.append(v) | |
raise NotFoundException("No requirement found that" | |
" matches '%s' (tried %s)" % (req, looked_in)) | |
def find_match(pkg_name, req): | |
return match_available(req.req, find_versions(pkg_name)) | |
def probe(requirements, gathered): | |
if not requirements: | |
return {} | |
requirements = requirements.copy() | |
gathered = gathered.copy() | |
# Pick one of the requirements, get a version that works with the current | |
# known siblings (other requirements that are requested along side this | |
# requirement) and then recurse trying to get another requirement that | |
# will work, if this is not possible, backtrack and try a different | |
# version instead (and repeat)... | |
pkg_name, pkg_requirements = requirements.popitem() | |
for req in pkg_requirements: | |
local_compat = False | |
try: | |
m = find_match(pkg_name, req) | |
except RequirementException: | |
pass | |
else: | |
# TODO: Check if compatible with what we already gathered... | |
local_compat = True | |
if local_compat: | |
# TODO: download the selected version, add all of its requirements | |
# into the currently being worked on requirements dictionary, | |
# and then continue... | |
# | |
# Freeze at this version and continue probing other packages... | |
gathered[pkg_name] = m | |
try: | |
result = probe(requirements, gathered) | |
except RequirementException: | |
gathered.pop(pkg_name) | |
else: | |
gathered.update(result) | |
return gathered | |
pkg_requirements = [req.req for req in pkg_requirements] | |
raise RequirementException("No requirement found that" | |
" matches '%s'" % (pkg_requirements)) | |
def main(): | |
parser = create_parser() | |
options = parser.parse_args() | |
initial = parse_requirements(options) | |
print("Initial package set:") | |
dump_requirements(initial) | |
# TODO: use these to download (via a request to pip) the dependencies we | |
# find that might work, so that we can then extract there desired/needed | |
# requirements and then continue probing.... | |
for d in ['.cache', '.build', '.download']: | |
if not os.path.isdir(os.path.join(os.getcwd(), d)): | |
os.makedirs(os.path.join(os.getcwd(), d)) | |
matches = probe(initial, {}) | |
print("Deep package set:") | |
dump_requirements(matches) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment