|
#!/usr/bin/env python3 |
|
from email.message import EmailMessage |
|
from email.parser import BytesParser |
|
from io import BytesIO |
|
from multiprocessing.dummy import Pool |
|
from operator import attrgetter, methodcaller |
|
from platform import python_version |
|
from sys import argv |
|
from urllib.parse import urlparse |
|
from zipfile import ZipFile |
|
|
|
import requests |
|
import html5lib |
|
from packaging.specifiers import SpecifierSet |
|
from packaging.version import Version, InvalidVersion |
|
from packaging.requirements import Requirement |
|
from packaging.utils import canonicalize_name |
|
from resolvelib import BaseReporter, Resolver |
|
|
|
from extras_provider import ExtrasProvider |
|
|
|
PYTHON_VERSION = Version(python_version()) |
|
|
|
|
|
class Candidate: |
|
def __init__(self, name, version, url, extras, provider): |
|
self.name, self.version = canonicalize_name(name), version |
|
self.url, self.extras, self.provider = url, extras, provider # XXX |
|
self._metadata = None |
|
|
|
def __repr__(self): |
|
if not self.extras: return f'<{self.name}=={self.version}>' |
|
return f"<{self.name}[{','.join(self.extras)}]=={self.version}>" |
|
|
|
@property |
|
def metadata(self): |
|
if self._metadata is None: |
|
self.provider.download() # XXX |
|
self.fetch_metadata() # incase the candidate is not the first |
|
return self._metadata |
|
|
|
def fetch_metadata(self): |
|
if self._metadata is not None: return |
|
print(f'Downloading {self.name}=={self.version}') |
|
data = requests.get(self.url).content |
|
with ZipFile(BytesIO(data)) as z: |
|
for n in z.namelist(): |
|
if n.endswith('.dist-info/METADATA'): |
|
p = BytesParser() |
|
self._metadata = p.parse(z.open(n), headersonly=True) |
|
return |
|
self._metadata = EmailMessage() |
|
|
|
@property |
|
def requires_python(self): |
|
return self.metadata.get('Requires-Python') |
|
|
|
@property |
|
def dependencies(self): |
|
extras = self.extras if self.extras else [''] |
|
for d in self.metadata.get_all('Requires-Dist', []): |
|
r = Requirement(d) |
|
if r.marker is None: |
|
yield r |
|
else: |
|
for e in extras: |
|
if r.marker.evaluate({'extra': e}): |
|
yield r |
|
|
|
|
|
class Downloader: |
|
def __init__(self, *args, **kwargs): |
|
self.soon_needed, self.cached_candidates = {}, {} |
|
|
|
def need_soon(self, candidate): |
|
self.soon_needed[candidate.name] = candidate |
|
|
|
def download(self): # XXX |
|
with Pool(10) as pool: |
|
pool.map(methodcaller('fetch_metadata'), self.soon_needed.values()) |
|
self.soon_needed = {} |
|
|
|
def get_from_pypi(self, project, extras): |
|
url = 'https://pypi.org/simple/{}'.format(project) |
|
data = requests.get(url).content |
|
doc = html5lib.parse(data, namespaceHTMLElements=False) |
|
for i in doc.findall('.//a'): |
|
url = i.attrib['href'] |
|
# Skip items that need a different Python version |
|
py_req = i.attrib.get('data-requires-python') |
|
if py_req: |
|
spec = SpecifierSet(py_req) |
|
if PYTHON_VERSION not in spec: continue |
|
|
|
path = urlparse(url).path |
|
filename = path.rpartition('/')[-1] |
|
# We only handle wheels |
|
if not filename.endswith('.whl'): continue |
|
name, version = filename[:-4].split("-")[:2] |
|
# TODO: Handle compatibility tags? |
|
try: |
|
version = Version(version) |
|
except InvalidVersion: |
|
# Ignore files with invalid versions |
|
continue |
|
yield self.make_candidate(name, version, url, extras) # XXX |
|
|
|
def make_candidate(self, name, version, url, extras): |
|
if (name, version) not in self.cached_candidates: |
|
self.cached_candidates[(name, version)] = Candidate( |
|
name, version, url, extras, self) |
|
return self.cached_candidates[(name, version)] |
|
|
|
|
|
class PyPIProvider(ExtrasProvider, Downloader): |
|
def identify(self, requirement_or_candidate): |
|
return canonicalize_name(requirement_or_candidate.name) |
|
|
|
def get_extras_for(self, requirement_or_candidate): |
|
# Extras is a set, which is not hashable |
|
return tuple(sorted(requirement_or_candidate.extras)) |
|
|
|
def get_base_requirement(self, candidate): |
|
return Requirement('{0.name}=={0.version}'.format(candidate)) |
|
|
|
def get_preference(self, resolution, candidates, information): |
|
return len(candidates) |
|
|
|
def find_matches(self, requirements): |
|
if not requirements: |
|
raise RuntimeError('resolver promises at least one requirement') |
|
if any(r.extras for r in requirements[1:]): |
|
raise RuntimeError('extras not supported in this example') |
|
name, candidates = canonicalize_name(requirements[0].name), [] |
|
for c in self.get_from_pypi(name, set()): |
|
# Need to pass the extras to the search, so they |
|
# are added to the candidate at creation - we |
|
# treat candidates as immutable once created. |
|
if all(c.version in r.specifier for r in requirements): |
|
candidates.append(c) |
|
candidates.sort(key=attrgetter('version'), reverse=True) |
|
try: |
|
self.need_soon(candidates[0]) # XXX |
|
except IndexError: |
|
pass # usually when only sdist provided |
|
return candidates |
|
|
|
def is_satisfied_by(self, requirement, candidate): |
|
if canonicalize_name(requirement.name) != candidate.name: return False |
|
return candidate.version in requirement.specifier |
|
|
|
def get_dependencies(self, candidate): |
|
return list(candidate.dependencies) |
|
|
|
|
|
def main(): |
|
if len(argv) < 2: return print('Usage:', argv[0], '<PyPI project name(s)>') |
|
# Create the (reusable) resolver |
|
# from my customly defined provider and a default reporter. |
|
provider = PyPIProvider() |
|
reporter = BaseReporter() |
|
resolver = Resolver(provider, reporter) |
|
# Kick off the resolution process, and get the final result. |
|
result = resolver.resolve([Requirement(r) for r in argv[1:]]) |
|
|
|
# Display the resolution result. |
|
print('\n--- Pinned Candidates ---') |
|
for name, candidate in result.mapping.items(): |
|
print(f'{name} {candidate.version}') |
|
print('\n--- Dependency Graph ---') |
|
for name in result.graph: |
|
targets = ', '.join(result.graph.iter_children(name)) or None |
|
print(f"{name} -> {targets}") |
|
|
|
|
|
if __name__ == '__main__': |
|
try: |
|
main() |
|
except KeyboardInterrupt: |
|
print() |
This comment has been minimized.
This program is derived from ResolveLib's examples and is licensed under the same ISC license.