Skip to content

Instantly share code, notes, and snippets.

@signalpillar
Created November 3, 2016 05:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save signalpillar/67e294ee1388a5814ab593ae361e3304 to your computer and use it in GitHub Desktop.
Save signalpillar/67e294ee1388a5814ab593ae361e3304 to your computer and use it in GitHub Desktop.
# encoding: utf-8
"""
Used resources
- https://wiki.python.org/moin/PyPIXmlRpc
"""
# Python: >= 3.5
# std
import collections
import contextlib
import hashlib
import os
from pathlib import Path
import pprint
import re
import shutil
import stat
import tarfile
import tempfile
import typing
import zipfile
# 3rd-party
import requests
from packaging.version import Version
from packaging.specifiers import SpecifierSet
import xmlrpc.client
# import vcr
Release = collections.namedtuple("Release", "data urls")
# PYPI_CLIENT_VCR = vcr.VCR(record_mode='new_episodes',
# cassette_library_dir='fixtures/cassettes',
# match_on=['uri', 'method', 'body'])
ReleaseLink = collections.namedtuple(
"ReleaseLink",
(
"python_version",
"filename",
"path",
'size',
'md5_digest',
'url',
"comment_text",
"upload_time",
"downloads",
'has_sig',
'packagetype'
))
Dependencies = collections.namedtuple("Dependencies", "runtime test setup")
class Pypi(object):
class Error(Exception):
""" Pypi service errors.
"""
def __init__(self, client: xmlrpc.client.ServerProxy):
super(Pypi, self).__init__()
self.client = client
@classmethod
# @vcr.use_cassette("fixtures/cassettes/init_client")
def connect(cls, url: str='https://pypi.python.org/pypi'):
proxy = xmlrpc.client.ServerProxy(
url, allow_none=True, use_builtin_types=True, encoding='utf8')
return cls(proxy)
# @vcr.use_cassette('fixtures/cassettes/all_packages', match_on=['body'])
def get_all_packages_names(self) -> typing.Set[str]:
return set(self.client.list_packages())
# @PYPI_CLIENT_VCR.use_cassette('all_versions')
def _get_all_versions(self, name: str) -> typing.Sequence[str]:
def _parse(html_page_content):
return set(re.findall(
">{}\-(.+?)(?:\.zip|\.tar.gz|\-py.*?\.whl)<\/a>"
.format(re.escape(name)),
html_page_content,
flags=re.IGNORECASE))
result = requests.get("https://pypi.python.org/simple/{}".format(name))
if result.status_code == 200:
return sorted(_parse(result.text))
raise Error("Failed to get all versions for the {}".format(name))
# @PYPI_CLIENT_VCR.use_cassette('latest_version')
def get_latest_version(self, name: str) -> str:
versions = self.client.package_releases(name)
if not versions:
raise self.Error("Failed to find the latest error for '{}'".format(name))
return versions[-1]
def get_package_versions(self, name: str) -> typing.Sequence[str]:
versions = self._get_all_versions(name)
if not versions:
raise self.Error("Failed to get versions for '{}'".format(name))
return versions
# @PYPI_CLIENT_VCR.use_cassette('release_data')
def get_release_data(self, name: str, version: str) -> Release:
return self.client.release_data(name, version)
# @PYPI_CLIENT_VCR.use_cassette('release_urls')
def get_release_urls(self, name: str, version: str) -> typing.Mapping:
return [
ReleaseLink(**data)
for data in self.client.release_urls(name, version)
]
def get_release(self, name: str, version: str) -> Release:
release_data = self.get_release_data(name, version)
if release_data:
release_data['description'] = release_data.get('description', '').encode('utf-8')
return Release(release_data, self.get_release_urls(name, version))
DEFAULT_CLIENT = Pypi.connect()
#: Default path where downloaded archive files will be stored
DEFAULT_DOWNLOAD_DIR = Path("/tmp/archivespypi")
SUPPORTED_ARCHIVE_EXT = ("tar.gz", 'zip',)
def _extract_zip(path: Path, dst: Path):
with zipfile.ZipFile(str(path)) as fd:
fd.extractall(path=str(dst))
def _extract_tar_gz(archive_path: Path, dst: Path):
with tarfile.open(str(archive_path)) as archive:
for entry in archive.getmembers():
archive.extract(entry, path=str(dst))
EXTRACT_FN_BY_EXT = {
'.gz': _extract_tar_gz, # for tar.gz
'.zip': _extract_zip,
# '.egg': _extract_zip,
}
def get_setup_details(release: Release) -> Dependencies:
archive = download_archive(release, target_dir=DEFAULT_DOWNLOAD_DIR)
with temporarely_extracted(archive) as source_dir:
return run_mocked_setup(source_dir)
def find_link_to_download(release: Release) -> typing.Optional[ReleaseLink]:
def is_supported_filetype(info: ReleaseLink) -> bool:
return any((info.filename.endswith(ext) for ext in EXTRACT_FN_BY_EXT))
info = next((info for info in release.urls if is_supported_filetype(info)), None)
if not info:
print("Not found supported URL between:\n{}".format('\n'.join(map(str, release.urls))))
raise ValueError("Failed to find link for downloading")
return info
def download_archive(release: Release, target_dir: Path) -> Path:
link = find_link_to_download(release)
target_file = target_dir / Path(link.filename)
if not target_file.is_file():
print("Download '{}'".format(link))
download_file(link, target_file)
else:
print("File {} is already downloaded".format(target_file))
return target_file
def download_file(link: ReleaseLink, target_filepath: Path) -> None:
def verify_md5_digest(path: Path, expected_digest: str) -> None:
with path.open('rb') as fd:
actual = hashlib.md5(fd.read())
assert expected_digest == actual.hexdigest(), (
"MD5 digest for downloaded differs. Actual {!r} != {!r}."
.format(actual, expected_digest))
def verify_file_size(path: Path, expected_size: int) -> None:
info = os.stat(path)
assert info.size == info[stat.ST_SIZE]
r = requests.get(link.url, stream=True)
with target_filepath.open('wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
if link.md5_digest:
verify_md5_digest(target_filepath, link.md5_digest)
elif link.size:
verify_file_size(target_filepath, link.size)
@contextlib.contextmanager
def temporarely_extracted(archive_path: Path):
_, ext = os.path.splitext(str(archive_path))
extract_fn = EXTRACT_FN_BY_EXT.get(ext)
if not extract_fn:
raise ValueError("Extract function for {} is not found.".format(ext))
with temp_dir() as path:
extract_fn(archive_path, path)
source_dir = next(path.iterdir(), None)
# we expect that it will be extracted as one directory in a temp dir
assert source_dir, (
"Assumption about extracted dirctory didn't work for archive: {!r}. "
"Content of the temporary folder is: {}".format(
archive_path,
','.join(path.iterdir())))
yield source_dir
@contextlib.contextmanager
def temp_dir() -> Path:
try:
tempdir = tempfile.mkdtemp()
yield Path(tempdir)
finally:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
def run_mocked_setup(source_dir: Path) -> Dependencies:
print('Run mocked setup: {}'.format(source_dir))
import mock
import imp
import json
with mock.patch("distutils.core.setup") as distutils_setup, \
mock.patch("setuptools.setup") as setuptools_setup, \
cd(source_dir):
error = None
file_, path, description = imp.find_module('setup', ['.'])
try:
module = imp.load_module('__main__', file_, path, description)
# module = imp.load_source("__main__", "setup.py")
except ImportError as error:
error = error
fn, original = (
(distutils_setup, 'distutils')
if distutils_setup.called
else (setuptools_setup, 'setuptools'))
if not fn.called:
raise Exception("None of the mocks is called.")
print("Called one: {}".format(original))
call_args = fn.call_args
kwargs = call_args[1]
# remove version field as it may call imports to the project packages
# see example unittest2==1.1.0
# kwargs.pop("version", None)
params_to_ignore = "classifiers license long_description".split()
for param in params_to_ignore:
kwargs.pop(param, None)
kwargs['install_requires'] = reqs_to_tuple(kwargs.get('install_requires'))
kwargs['setup_requires'] = reqs_to_tuple(kwargs.get('setup_requires'))
return kwargs
def reqs_to_tuple(reqs):
if reqs is None:
return ()
elif isinstance(reqs, str):
return (reqs,)
elif isinstance(reqs, tuple):
return tuple(sum(reqs, []))
return tuple(reqs)
@contextlib.contextmanager
def cd(path: Path):
original_path = os.getcwd()
try:
os.chdir(str(path))
yield
finally:
os.chdir(original_path)
def get_dep_tree(name: str, version:str=None, client:Pypi=None, all_packages=None):
client = client or DEFAULT_CLIENT
if not all_packages:
print("Get all packages")
all_packages = {
name.lower(): name
for name in client.get_all_packages_names()
}
normalised_name = all_packages.get(name.lower())
if not normalised_name:
raise ValueError("Package '{}' doesn't exist".format(name))
elif normalised_name != name:
print("Normalised name from '{}' to '{}'".format(name, normalised_name))
name = normalised_name
if not version:
versions = client.get_package_versions(name)
print('versions:{} '.format(versions))
version = versions[-1]
print("Found versions {}. Took latest one {}".format(', '.join(versions), version))
release = client.get_release(name, version)
details = get_setup_details(release)
requirements = details.get('install_requires') + details.get('setup_requires')
reqs = []
for version_spec in set(requirements):
req_name, spec = parse_to_semantic_spec(version_spec)
normalised_req_name = all_packages.get(req_name.lower())
if normalised_req_name != req_name:
print("Normalised name from '{}' to '{}'".format(req_name, normalised_req_name))
req_name = normalised_req_name
print("Requirement: '{name}' with spec: '{spec}'".format(name=req_name, spec=spec))
if spec:
spec = SpecifierSet(spec)
versions = client.get_package_versions(req_name)
print("Versions: {}".format(', '.join(versions)))
candidates = list(spec.filter(versions))
req_version = str(candidates[-1])
print("Filtered versions: {}. Take latest: {}"
.format(', '.join(candidates), req_version))
else:
req_version = client.get_latest_version(req_name)
print("Take latest: {}".format(req_version))
req_dep_tree = get_dep_tree(
req_name,
version=str(req_version),
client=client,
all_packages=all_packages)
req_dep_tree['version_spec'] = str(spec)
reqs.append(req_dep_tree)
return {
'name': name,
'version': version,
'reqs': reqs
}
PYPI_VERSION_SPEC_RE = re.compile("^(\w+)(.*)")
def parse_to_semantic_spec(version_spec: str) -> typing.Tuple[str, str]:
""" Parse form "NAME<spec>" to pair of NAME and <spec>
"""
matched = PYPI_VERSION_SPEC_RE.match(version_spec)
if matched:
return matched.groups()
raise ValueError("Cannot parse version spec from: '{}'".format(version_spec))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment