Created
November 3, 2016 05:51
-
-
Save signalpillar/67e294ee1388a5814ab593ae361e3304 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
""" | |
Used resources | |
- https://wiki.python.org/moin/PyPIXmlRpc | |
""" | |
# Python: >= 3.5 | |
# std | |
import collections | |
import contextlib | |
import hashlib | |
import os | |
from pathlib import Path | |
import pprint | |
import re | |
import shutil | |
import stat | |
import tarfile | |
import tempfile | |
import typing | |
import zipfile | |
# 3rd-party | |
import requests | |
from packaging.version import Version | |
from packaging.specifiers import SpecifierSet | |
import xmlrpc.client | |
# import vcr | |
Release = collections.namedtuple("Release", "data urls") | |
# PYPI_CLIENT_VCR = vcr.VCR(record_mode='new_episodes', | |
# cassette_library_dir='fixtures/cassettes', | |
# match_on=['uri', 'method', 'body']) | |
ReleaseLink = collections.namedtuple( | |
"ReleaseLink", | |
( | |
"python_version", | |
"filename", | |
"path", | |
'size', | |
'md5_digest', | |
'url', | |
"comment_text", | |
"upload_time", | |
"downloads", | |
'has_sig', | |
'packagetype' | |
)) | |
Dependencies = collections.namedtuple("Dependencies", "runtime test setup") | |
class Pypi(object): | |
class Error(Exception): | |
""" Pypi service errors. | |
""" | |
def __init__(self, client: xmlrpc.client.ServerProxy): | |
super(Pypi, self).__init__() | |
self.client = client | |
@classmethod | |
# @vcr.use_cassette("fixtures/cassettes/init_client") | |
def connect(cls, url: str='https://pypi.python.org/pypi'): | |
proxy = xmlrpc.client.ServerProxy( | |
url, allow_none=True, use_builtin_types=True, encoding='utf8') | |
return cls(proxy) | |
# @vcr.use_cassette('fixtures/cassettes/all_packages', match_on=['body']) | |
def get_all_packages_names(self) -> typing.Set[str]: | |
return set(self.client.list_packages()) | |
# @PYPI_CLIENT_VCR.use_cassette('all_versions') | |
def _get_all_versions(self, name: str) -> typing.Sequence[str]: | |
def _parse(html_page_content): | |
return set(re.findall( | |
">{}\-(.+?)(?:\.zip|\.tar.gz|\-py.*?\.whl)<\/a>" | |
.format(re.escape(name)), | |
html_page_content, | |
flags=re.IGNORECASE)) | |
result = requests.get("https://pypi.python.org/simple/{}".format(name)) | |
if result.status_code == 200: | |
return sorted(_parse(result.text)) | |
raise Error("Failed to get all versions for the {}".format(name)) | |
# @PYPI_CLIENT_VCR.use_cassette('latest_version') | |
def get_latest_version(self, name: str) -> str: | |
versions = self.client.package_releases(name) | |
if not versions: | |
raise self.Error("Failed to find the latest error for '{}'".format(name)) | |
return versions[-1] | |
def get_package_versions(self, name: str) -> typing.Sequence[str]: | |
versions = self._get_all_versions(name) | |
if not versions: | |
raise self.Error("Failed to get versions for '{}'".format(name)) | |
return versions | |
# @PYPI_CLIENT_VCR.use_cassette('release_data') | |
def get_release_data(self, name: str, version: str) -> Release: | |
return self.client.release_data(name, version) | |
# @PYPI_CLIENT_VCR.use_cassette('release_urls') | |
def get_release_urls(self, name: str, version: str) -> typing.Mapping: | |
return [ | |
ReleaseLink(**data) | |
for data in self.client.release_urls(name, version) | |
] | |
def get_release(self, name: str, version: str) -> Release: | |
release_data = self.get_release_data(name, version) | |
if release_data: | |
release_data['description'] = release_data.get('description', '').encode('utf-8') | |
return Release(release_data, self.get_release_urls(name, version)) | |
DEFAULT_CLIENT = Pypi.connect() | |
#: Default path where downloaded archive files will be stored | |
DEFAULT_DOWNLOAD_DIR = Path("/tmp/archivespypi") | |
SUPPORTED_ARCHIVE_EXT = ("tar.gz", 'zip',) | |
def _extract_zip(path: Path, dst: Path): | |
with zipfile.ZipFile(str(path)) as fd: | |
fd.extractall(path=str(dst)) | |
def _extract_tar_gz(archive_path: Path, dst: Path): | |
with tarfile.open(str(archive_path)) as archive: | |
for entry in archive.getmembers(): | |
archive.extract(entry, path=str(dst)) | |
EXTRACT_FN_BY_EXT = { | |
'.gz': _extract_tar_gz, # for tar.gz | |
'.zip': _extract_zip, | |
# '.egg': _extract_zip, | |
} | |
def get_setup_details(release: Release) -> Dependencies: | |
archive = download_archive(release, target_dir=DEFAULT_DOWNLOAD_DIR) | |
with temporarely_extracted(archive) as source_dir: | |
return run_mocked_setup(source_dir) | |
def find_link_to_download(release: Release) -> typing.Optional[ReleaseLink]: | |
def is_supported_filetype(info: ReleaseLink) -> bool: | |
return any((info.filename.endswith(ext) for ext in EXTRACT_FN_BY_EXT)) | |
info = next((info for info in release.urls if is_supported_filetype(info)), None) | |
if not info: | |
print("Not found supported URL between:\n{}".format('\n'.join(map(str, release.urls)))) | |
raise ValueError("Failed to find link for downloading") | |
return info | |
def download_archive(release: Release, target_dir: Path) -> Path: | |
link = find_link_to_download(release) | |
target_file = target_dir / Path(link.filename) | |
if not target_file.is_file(): | |
print("Download '{}'".format(link)) | |
download_file(link, target_file) | |
else: | |
print("File {} is already downloaded".format(target_file)) | |
return target_file | |
def download_file(link: ReleaseLink, target_filepath: Path) -> None: | |
def verify_md5_digest(path: Path, expected_digest: str) -> None: | |
with path.open('rb') as fd: | |
actual = hashlib.md5(fd.read()) | |
assert expected_digest == actual.hexdigest(), ( | |
"MD5 digest for downloaded differs. Actual {!r} != {!r}." | |
.format(actual, expected_digest)) | |
def verify_file_size(path: Path, expected_size: int) -> None: | |
info = os.stat(path) | |
assert info.size == info[stat.ST_SIZE] | |
r = requests.get(link.url, stream=True) | |
with target_filepath.open('wb') as f: | |
for chunk in r.iter_content(chunk_size=1024): | |
if chunk: # filter out keep-alive new chunks | |
f.write(chunk) | |
if link.md5_digest: | |
verify_md5_digest(target_filepath, link.md5_digest) | |
elif link.size: | |
verify_file_size(target_filepath, link.size) | |
@contextlib.contextmanager | |
def temporarely_extracted(archive_path: Path): | |
_, ext = os.path.splitext(str(archive_path)) | |
extract_fn = EXTRACT_FN_BY_EXT.get(ext) | |
if not extract_fn: | |
raise ValueError("Extract function for {} is not found.".format(ext)) | |
with temp_dir() as path: | |
extract_fn(archive_path, path) | |
source_dir = next(path.iterdir(), None) | |
# we expect that it will be extracted as one directory in a temp dir | |
assert source_dir, ( | |
"Assumption about extracted dirctory didn't work for archive: {!r}. " | |
"Content of the temporary folder is: {}".format( | |
archive_path, | |
','.join(path.iterdir()))) | |
yield source_dir | |
@contextlib.contextmanager | |
def temp_dir() -> Path: | |
try: | |
tempdir = tempfile.mkdtemp() | |
yield Path(tempdir) | |
finally: | |
if os.path.isdir(tempdir): | |
shutil.rmtree(tempdir) | |
def run_mocked_setup(source_dir: Path) -> Dependencies: | |
print('Run mocked setup: {}'.format(source_dir)) | |
import mock | |
import imp | |
import json | |
with mock.patch("distutils.core.setup") as distutils_setup, \ | |
mock.patch("setuptools.setup") as setuptools_setup, \ | |
cd(source_dir): | |
error = None | |
file_, path, description = imp.find_module('setup', ['.']) | |
try: | |
module = imp.load_module('__main__', file_, path, description) | |
# module = imp.load_source("__main__", "setup.py") | |
except ImportError as error: | |
error = error | |
fn, original = ( | |
(distutils_setup, 'distutils') | |
if distutils_setup.called | |
else (setuptools_setup, 'setuptools')) | |
if not fn.called: | |
raise Exception("None of the mocks is called.") | |
print("Called one: {}".format(original)) | |
call_args = fn.call_args | |
kwargs = call_args[1] | |
# remove version field as it may call imports to the project packages | |
# see example unittest2==1.1.0 | |
# kwargs.pop("version", None) | |
params_to_ignore = "classifiers license long_description".split() | |
for param in params_to_ignore: | |
kwargs.pop(param, None) | |
kwargs['install_requires'] = reqs_to_tuple(kwargs.get('install_requires')) | |
kwargs['setup_requires'] = reqs_to_tuple(kwargs.get('setup_requires')) | |
return kwargs | |
def reqs_to_tuple(reqs): | |
if reqs is None: | |
return () | |
elif isinstance(reqs, str): | |
return (reqs,) | |
elif isinstance(reqs, tuple): | |
return tuple(sum(reqs, [])) | |
return tuple(reqs) | |
@contextlib.contextmanager | |
def cd(path: Path): | |
original_path = os.getcwd() | |
try: | |
os.chdir(str(path)) | |
yield | |
finally: | |
os.chdir(original_path) | |
def get_dep_tree(name: str, version:str=None, client:Pypi=None, all_packages=None): | |
client = client or DEFAULT_CLIENT | |
if not all_packages: | |
print("Get all packages") | |
all_packages = { | |
name.lower(): name | |
for name in client.get_all_packages_names() | |
} | |
normalised_name = all_packages.get(name.lower()) | |
if not normalised_name: | |
raise ValueError("Package '{}' doesn't exist".format(name)) | |
elif normalised_name != name: | |
print("Normalised name from '{}' to '{}'".format(name, normalised_name)) | |
name = normalised_name | |
if not version: | |
versions = client.get_package_versions(name) | |
print('versions:{} '.format(versions)) | |
version = versions[-1] | |
print("Found versions {}. Took latest one {}".format(', '.join(versions), version)) | |
release = client.get_release(name, version) | |
details = get_setup_details(release) | |
requirements = details.get('install_requires') + details.get('setup_requires') | |
reqs = [] | |
for version_spec in set(requirements): | |
req_name, spec = parse_to_semantic_spec(version_spec) | |
normalised_req_name = all_packages.get(req_name.lower()) | |
if normalised_req_name != req_name: | |
print("Normalised name from '{}' to '{}'".format(req_name, normalised_req_name)) | |
req_name = normalised_req_name | |
print("Requirement: '{name}' with spec: '{spec}'".format(name=req_name, spec=spec)) | |
if spec: | |
spec = SpecifierSet(spec) | |
versions = client.get_package_versions(req_name) | |
print("Versions: {}".format(', '.join(versions))) | |
candidates = list(spec.filter(versions)) | |
req_version = str(candidates[-1]) | |
print("Filtered versions: {}. Take latest: {}" | |
.format(', '.join(candidates), req_version)) | |
else: | |
req_version = client.get_latest_version(req_name) | |
print("Take latest: {}".format(req_version)) | |
req_dep_tree = get_dep_tree( | |
req_name, | |
version=str(req_version), | |
client=client, | |
all_packages=all_packages) | |
req_dep_tree['version_spec'] = str(spec) | |
reqs.append(req_dep_tree) | |
return { | |
'name': name, | |
'version': version, | |
'reqs': reqs | |
} | |
PYPI_VERSION_SPEC_RE = re.compile("^(\w+)(.*)") | |
def parse_to_semantic_spec(version_spec: str) -> typing.Tuple[str, str]: | |
""" Parse form "NAME<spec>" to pair of NAME and <spec> | |
""" | |
matched = PYPI_VERSION_SPEC_RE.match(version_spec) | |
if matched: | |
return matched.groups() | |
raise ValueError("Cannot parse version spec from: '{}'".format(version_spec)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment