Skip to content

Instantly share code, notes, and snippets.

@benoit-pierre
Created April 24, 2021 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benoit-pierre/863b65e7220346817efffcc8982a8cf5 to your computer and use it in GitHub Desktop.
Save benoit-pierre/863b65e7220346817efffcc8982a8cf5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from collections import defaultdict
from collections import namedtuple
from fnmatch import fnmatch
from html import escape as html_escape
from pathlib import Path
import functools
import glob
import itertools
import operator
import sys
import tempfile
import textwrap
from dateparser import parse as parse_date
from distlib.wheel import Wheel, is_compatible as is_wheel_compatible
from packaging.markers import UndefinedEnvironmentName, _evaluate_markers as evaluate_markers
from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet
from packaging.version import parse as parse_version
from pkg_resources import safe_extra, safe_name, split_sections
from pkginfo import SDist
from requests_cache.core import CachedSession
from tqdm import tqdm
PYPI_URL = 'https://pypi.org/pypi'
REGISTRY_URL = 'https://github.com/openstenoproject/plover_plugins_registry/raw/master/registry.json'
TargetEnv = namedtuple('TargetEnv', '''
name
sys_platform
platform_system
python_version
python_implementation
sample_wheels
''')
Distribution = namedtuple('Distribution', '''
filename type requires_python url sha256
''')
class Release(namedtuple('Release', '''
version requires_dist requires_python
available_distributions
''')):
@property
def sdist_only(self):
for dist in self.available_distributions:
if dist.type == 'bdist_wheel':
return False
return True
class Package(namedtuple('Package', '''
name is_plugin supported_releases
''')):
@property
def key(self):
return normalize_name(self.name)
def normalize_name(name):
return safe_name(name).lower()
def parse_requirement(req):
req = Requirement(req)
req.name = normalize_name(req.name)
return req
def strip_marker(req):
req = Requirement(str(req))
req.marker = None
return req
def extract_package_requires_dist(entries, read_file, dir_pattern=None):
requires_dist = []
for name in 'requires.txt', 'depends.txt':
if dir_pattern is not None:
pattern = '/'.join((dir_pattern, name))
else:
pattern = name
try:
match = next(n for n in entries if fnmatch(n, pattern))
except StopIteration:
continue
data = read_file(match).decode().split('\n')
for section, reqs in split_sections(data):
if section is None:
extra = ''
marker = ''
else:
extra, __, marker = section.partition(':')
extra = safe_extra(extra) or ''
if extra:
if marker:
marker = '(%s) and ' % marker
marker += 'extra == %r' % extra
for r in reqs:
r = parse_requirement(r)
assert not r.marker
r = str(r)
if marker:
r += '; ' + marker
requires_dist.append(str(r))
return requires_dist
def sdist_metadata(package):
metadata = SDist(package)
if metadata.requires_dist:
return metadata
__, entries, read_file = SDist._get_archive(package)
dir_pattern = '*/%s.egg-info' % glob.escape(metadata.name)
metadata.requires_dist = extract_package_requires_dist(entries,
read_file,
dir_pattern)
return metadata
class PackageIndexBuilder:
def __init__(self, target_envs, release_date_cutoff=None, filters=None):
self._target_envs = {}
self._supported_python = set()
self._supported_tags = defaultdict(set)
self._supported_envs = {}
for te in target_envs:
te = te._replace(python_version=parse_version(te.python_version))
self._supported_python.add(te.python_version)
for wheel_filename in te.sample_wheels:
wheel = Wheel(wheel_filename)
for interpreter, abi, platform in itertools.product(
wheel.pyver,
wheel.abi,
wheel.arch,
):
self._supported_tags[(interpreter, abi, platform)].add(te.name)
self._supported_envs[te.name] = {
'platform.python_implementation': te.python_implementation,
'platform_python_implementation': te.python_implementation,
'python_implementation': te.python_implementation,
'python_version': str(te.python_version),
'sys.platform': te.sys_platform,
'sys_platform': te.sys_platform,
'platform_system': te.platform_system,
}
assert te.name not in self._target_envs
self._target_envs[te.name] = te
if release_date_cutoff is None:
self._release_date_cutoff = None
else:
self._release_date_cutoff = parse_date(release_date_cutoff)
self._filters = defaultdict(set)
if filters is not None:
for req in map(parse_requirement, filters):
self._filters[req.name].add(req.specifier)
self._packages = {}
self._session = CachedSession(cache_name=Path(__file__).stem, backend='sqlite')
self._session.remove_expired_responses()
self._resolved = defaultdict(dict)
def _is_excluded(self, key, version):
for specifier in self._filters[key]:
if version not in specifier:
return True
return False
def _is_supported_wheel(self, wheel_filename):
return is_wheel_compatible(wheel_filename, self._supported_tags)
def _is_supported_python(self, requires_python):
if requires_python is None:
return True
requires_python = SpecifierSet(requires_python)
return any(v in requires_python for v in self._supported_python)
def _fetch_release_json(self, name, version=None):
if version is None:
url = '%s/%s/json' % (PYPI_URL, name)
expire_after = 60 * 10 * 60
else:
url = '%s/%s/%s/json' % (PYPI_URL, name, version)
expire_after = None
resp = self._session.get(url, expire_after=expire_after)
if resp.status_code != 200:
return None
return resp.json()
def _evaluate_marker(self, marker, extra_list=()):
if marker is None:
return True
for env in self._supported_envs.values():
env = dict(env)
for extra in ['None'] + list(extra_list):
env['extra'] = extra
try:
if evaluate_markers(marker._markers, env):
return True
except UndefinedEnvironmentName as e:
breakpoint()
raise
return False
def _fetch_package_list(self, name_list):
name_list = list(name_list)
for name in tqdm(name_list, leave=False):
package_data = self._fetch_release_json(name)
if package_data is None:
continue
info = package_data['info']
keywords = (info['keywords'] or '').split()
is_plugin = 'plover_plugin' in keywords
pkg = Package(info['name'], is_plugin, {})
for version, download_list in package_data['releases'].items():
if self._is_excluded(pkg.key, version):
continue
release = Release(version, None, None, [])
sdist_list = []
wheel_list = []
for download in download_list:
if download['yanked']:
continue
filename = download['filename']
packagetype = download['packagetype']
if packagetype == 'bdist_wheel':
if not self._is_supported_wheel(filename):
continue
dist_list = wheel_list
elif packagetype == 'sdist':
dist_list = sdist_list
else:
continue
if self._release_date_cutoff is not None:
if parse_date(download['upload_time']) < self._release_date_cutoff:
continue
requires_python = download['requires_python']
if not self._is_supported_python(requires_python):
continue
dist = Distribution(filename, packagetype,
requires_python, download['url'],
download['digests']['sha256'])
dist_list.append(dist)
dist_list = wheel_list or sdist_list
if dist_list:
release.available_distributions.extend(dist_list)
pkg.supported_releases[release.version] = release
assert pkg.key not in self._packages
self._packages[pkg.key] = pkg
version = info['version']
release = pkg.supported_releases.get(version)
if release is None:
continue
release = release._replace(requires_python=info['requires_python'],
requires_dist=info['requires_dist'] or [])
pkg.supported_releases[version] = release
def _fetch_release_sdist(self, release):
assert len(release.available_distributions) == 1
sdist = release.available_distributions[0]
resp = self._session.get(sdist.url)
if resp.status_code != 200:
print('failed to fetch `%s`: %s' % (sdist.url, resp.reason))
return None
with tempfile.TemporaryDirectory() as tmpdir:
sdist_file = Path(tmpdir) / sdist.filename
sdist_file.write_bytes(resp.content)
metadata = sdist_metadata(str(sdist_file))
return {
'info': {
'name': normalize_name(metadata.name),
'version': metadata.version,
'yanked': False,
'requires_python': metadata.requires_python,
'requires_dist': metadata.requires_dist,
},
}
def _fetch_release_list(self, release_list):
release_list = list(release_list)
for name, release in tqdm(release_list, leave=False):
if release.sdist_only:
release_data = self._fetch_release_sdist(release)
else:
release_data = self._fetch_release_json(name, release.version)
if release_data is None:
continue
info = release_data['info']
assert normalize_name(info['name']) == name
assert info['version'] == release.version
assert not info['yanked']
pkg = self._packages[name]
release = pkg.supported_releases[release.version]
release = release._replace(requires_python=info['requires_python'],
requires_dist=info['requires_dist'] or [])
pkg.supported_releases[release.version] = release
def _resolve(self, req, from_chain=()):
if not isinstance(req, Requirement):
req = parse_requirement(req)
req_str = str(req)
resolved = self._resolved[req.name]
if req_str in resolved:
resolution, from_chain_set = resolved[req_str]
from_chain_set.add(from_chain)
return resolution
if not self._evaluate_marker(req.marker):
resolved[req_str] = (False, {from_chain})
return False
pkg = self._packages[req.name]
matching_releases = {
release.version: release
for release in pkg.supported_releases.values()
if release.version in req.specifier
and not self._is_excluded(req.name, release.version)
}
self._fetch_release_list((req.name, release)
for release in matching_releases.values()
if release.requires_dist is None)
for version in sorted(matching_releases):
release = pkg.supported_releases[version]
requires_dist = list(map(parse_requirement, release.requires_dist))
requires_dist = [
dep
for dep in requires_dist
if self._evaluate_marker(dep.marker, req.extras)
]
missing_packages = {
dep.name
for dep in requires_dist
if dep.name not in self._packages
}
self._fetch_package_list(missing_packages)
for dep in requires_dist:
if not self._resolve(strip_marker(dep), (str(req),) + from_chain):
del matching_releases[version]
break
resolution = bool(matching_releases)
resolved[req_str] = (resolution, {from_chain})
return resolution
def build(self):
self._fetch_package_list(['plover'])
assert self._resolve('plover[gui_qt,log]>=4.0.0.dev9')
plugins_list = self._session.get(REGISTRY_URL).json()
# plugins_list = ['plover-italian-stentura']
self._fetch_package_list(plugins_list)
for plugin_name in plugins_list:
self._resolve(plugin_name)
for pkg in self._packages.values():
for release in list(pkg.supported_releases.values()):
if release.requires_dist is None:
del pkg.supported_releases[release.version]
print('%u packages' % len(self._packages))
print('%u releases' % functools.reduce(
operator.add, (len(pkg.supported_releases)
for pkg in self._packages.values()), 0))
def to_html(self, file=sys.stdout):
print(textwrap.dedent(
'''
<!DOCTYPE html>
<html>
<head>
<meta name="pypi:repository-version" content="1.0">
<title>Plover Package Index</title>
</head>
<body>
'''
).lstrip(), file=file, end='')
for key, pkg in sorted(self._packages.items()):
if not pkg.supported_releases:
print('no supported release for `%s`' % pkg.name, file=sys.stderr)
for dep, (__, from_chain_set) in sorted(self._resolved[key].items()):
for from_chain in sorted(from_chain_set):
print('-', ' <- '.join((dep,) + from_chain))
continue
print('<h1>Links for <a id="%s" href="%s">%s</a></h1>' % (
key, '%s/%s' % (PYPI_URL, pkg.name), html_escape(pkg.name),
), file=file)
for version in sorted(pkg.supported_releases, key=parse_version):
release = pkg.supported_releases[version]
if release.sdist_only:
print('no wheels for %s-%s' % (pkg.name, version))
for dist in sorted(release.available_distributions):
attrs = {
'href': '%s#sha256=%s' % (dist.url, dist.sha256),
}
requires_python = ', '.join(set(filter(None, (
release.requires_python,
dist.requires_python,
))))
if requires_python:
attrs['data-requires-python'] = html_escape(requires_python)
print('<a %s>%s</a><br/>' % (
' '.join('%s="%s"' % a for a in attrs.items()),
html_escape(dist.filename),
), file=file)
print(textwrap.dedent(
'''
</body>
</html>
'''
).lstrip(), file=file, end='')
pib = PackageIndexBuilder([
TargetEnv(
'linux-py36',
'linux', 'Linux', '3.6', 'CPython', '''
appdirs-1.4.4-py3-none-any.whl
cffi-1.14.5-cp36-cp36m-manylinux1_x86_64.whl
hidapi-0.10.1-cp36-cp36m-manylinux2014_x86_64.whl
PyQt5-5.15.4-cp36-abi3-manylinux2014_x86_64.whl
PyQt5_Qt5-5.15.2-py3-none-manylinux2014_x86_64.whl
'''.split()
),
TargetEnv(
'linux-py37',
'linux', 'Linux', '3.7', 'CPython', '''
appdirs-1.4.4-py3-none-any.whl
cffi-1.14.5-cp37-cp37m-manylinux1_x86_64.whl
hidapi-0.10.1-cp37-cp37m-manylinux2014_x86_64.whl
PyQt5-5.15.4-cp37-abi3-manylinux2014_x86_64.whl
PyQt5_Qt5-5.15.2-py3-none-manylinux2014_x86_64.whl
'''.split()
),
TargetEnv(
'linux-py38',
'linux', 'Linux', '3.8', 'CPython', '''
appdirs-1.4.4-py3-none-any.whl
cffi-1.14.5-cp38-cp38-manylinux1_x86_64.whl
hidapi-0.10.1-cp38-cp38-manylinux2014_x86_64.whl
PyQt5-5.15.4-cp38-abi3-manylinux2014_x86_64.whl
PyQt5_Qt5-5.15.2-py3-none-manylinux2014_x86_64.whl
'''.split()
),
TargetEnv(
'linux-py39',
'linux', 'Linux', '3.9', 'CPython', '''
appdirs-1.4.4-py3-none-any.whl
cffi-1.14.5-cp39-cp39-manylinux1_x86_64.whl
hidapi-0.10.1-cp39-cp39-manylinux2014_x86_64.whl
PyQt5-5.15.4-cp39-abi3-manylinux2014_x86_64.whl
PyQt5_Qt5-5.15.2-py3-none-manylinux2014_x86_64.whl
'''.split()
),
TargetEnv(
'macos-py38',
'darwin', 'Mac OS X', '3.8', 'CPython', '''
aiohttp-3.6.2-cp38-cp38m-macosx_10_13_x86_64.whl
appdirs-1.4.4-py3-none-any.whl
cffi-1.14.5-cp38-cp38-macosx_10_9_x86_64.whl
PyQt5_Qt5-5.15.2-py3-none-macosx_10_13_intel.whl
'''.split()
),
TargetEnv(
'windows-py38',
'win32', 'Windows', '3.8', 'CPython', '''
appdirs-1.4.4-py3-none-any.whl
cffi-1.14.5-cp38-cp38-win_amd64.whl
PyQt5_Qt5-5.15.2-py3-none-win_amd64.whl
PyQt5-5.15.4-cp38-none-win_amd64.whl
'''.split()
),
],
'2015-01-01T00:00:00',
'''
aiohttp>=3.6.2
certifi>=2020.4.5
cffi>=1.14.0
cmarkgfm>=0.5.0
dbus-python>=1.2.16
ddt>=1.3.0
fuzzyset>=0.0.19
gitdb2>=3.0.0
GitPython>=3.0.6
hidapi>=0.9.0.post2
html5lib>=1.1
idna>=2.9
idna_ssl>=1.1.0
importlib-metadata>=1.4.0
jdcal>=1.4.1
jsonpickle>=1.3
jsonschema>=3.2.0
lxml>=4.5.0
more-itertools>=8.1.0
multidict>=4.7.6
openpyxl>=3.0.3
packaging>=20.0
pbr>=5.4.5
pip>=20.0.2
plover>=4.0.0.dev9
pycparser>=2.20
pyexcel-io>=0.6.2
pyexcel-ods3>=0.6.0
pyexcel-xlsx>=0.6.0
pyexcel>=0.6.1
pyfiglet>=0.8.post1
Pygments>=2.6.0
PyMI>=1.0.5
pyobjc-core>=6.2
pyobjc-framework-Cocoa>=6.2
pyobjc-framework-Quartz>=6.2
pyparsing>=2.4.7
PyQt5-sip>=4.19.19
PyQt5>=5.13.2
pyrsistent>=0.15.7
pyserial>=3.5
python-rtmidi>=1.4.0
pyusb>=1.1.1
readme-renderer>=25.0
requests-cache>=0.5.2
requests-futures>=1.0.0
requests>=2.23.0
ruamel.yaml>=0.16.6
setuptools>=51.0.0
six>=1.14.0
smmap2>=3.0.1
smmap>=3.0.0
textstat>=0.5.7
texttable>=1.6.3
typing>=3.7.4.1
urllib3>=1.25.8
webencodings>=0.5.1
wcwidth>=0.2.0
wheel>=0.34.0
yarl>=1.5.1
zipp>=2.0.0
'''.split(),
)
pib.build()
# breakpoint()
with open('plover_package_index.html', 'w') as fp:
pib.to_html(file=fp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment