Created
July 12, 2013 16:56
-
-
Save pfmoore/5985969 to your computer and use it in GitHub Desktop.
Standalone wheel installer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Standalone wheel file installer. | |
Wheels are a binary install format for Python. The wheel package provides | |
hooks to allow pip to install wheels, as well as a "wheel install" command for | |
standalone use. However, it is still necessary to have the wheel package | |
installed if you want to install a wheel. Given that the wheel format is | |
intended to be "truly easy to install", it seems reasonable to expect to be | |
able to install a wheel without needing any special tools. | |
Hence this utility. | |
It offers the ability to do nothing more than install a wheel file that is | |
specified on the command line into the current Python installation. There are | |
a few extras - the wheel is checked for compatibility with the current Python, | |
and the user is warned if the wheel is not compatible, and the install | |
locations can be overridden (this latter feature is expected to be mainly | |
useful for testing, though). | |
""" | |
import hashlib | |
import zipfile | |
import shutil | |
import base64 | |
import csv | |
import sys | |
import os | |
import re | |
from optparse import OptionParser | |
from email.parser import Parser | |
from distutils.version import LooseVersion as Ver | |
import distutils.util | |
LOCATIONS = [ | |
'purelib', | |
'platlib', | |
'include', | |
'scripts', | |
'data', | |
] | |
PATHS = {} | |
try: | |
import sysconfig | |
for loc in LOCATIONS: | |
PATHS[loc] = sysconfig.get_path(loc) | |
except ImportError: | |
from distutils import sysconfig | |
sc_vars = sysconfig.get_config_vars() | |
PATHS['purelib'] = os.path.join(sc_vars['LIBDEST'], 'site-packages') | |
PATHS['platlib'] = os.path.join(sc_vars['BINLIBDEST'], 'site-packages') | |
PATHS['include'] = sc_vars['INCLUDEPY'] | |
# OS-Specific - Windows!!! | |
PATHS['scripts'] = os.path.join(sc_vars['BINDIR'], 'Scripts') | |
PATHS['data'] = sc_vars['prefix'] | |
def get_path(name): | |
return PATHS[name] | |
if sys.version_info[0] < 3: | |
def native(s): | |
return s | |
def open_for_csv(name, mode): | |
return open(name, mode + 'b') | |
def read_pkg_info_bytes(bytestr): | |
return Parser().parsestr(bytestr) | |
else: | |
def native(s): | |
if isinstance(s, bytes): | |
return s.decode('ascii') | |
return s | |
def open_for_csv(name, mode): | |
return open(name, mode, newline='') | |
def read_pkg_info_bytes(bytestr): | |
headers = bytestr.decode(encoding="ascii", errors="surrogateescape") | |
message = Parser().parsestr(headers) | |
return message | |
WHEEL_INFO_RE = re.compile( | |
r"""^(?P<namever>(?P<name>.+?)(-(?P<ver>\d.+?))?) | |
((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?) | |
\.whl|\.dist-info)$""", | |
re.VERBOSE).match | |
def get_platform(): | |
"""Return our platform name 'win32', 'linux_x86_64'""" | |
return distutils.util.get_platform().replace('.', '_').replace('-', '_') | |
def pyver_match(versions): | |
pyver = "%d%d" % (sys.version_info[0], sys.version_info[1]) | |
pyimpl = 'py' | |
if hasattr(sys, 'pypy_version_info'): | |
pyimpl = 'pp' | |
elif sys.platform.startswith('java'): | |
pyimpl = 'jy' | |
elif sys.platform == 'cli': | |
pyimpl = 'ip' | |
else: | |
pyimpl = 'cp' | |
# Check each of the dot-separated cases | |
for v in versions.split('.'): | |
impl = v[:2] | |
ver = v[2:] | |
# Implementation must be generic or match | |
if impl != 'py' and impl != pyimpl: | |
continue | |
# Version must match (a prefis of) the Python version | |
if not pyver.startswith(ver): | |
continue | |
# Passsed all the tests - this will do | |
return True | |
# No version spec matched | |
return False | |
def get_best_wheel(distname, files): | |
# Ignore case differences | |
distname = distname.lower() | |
# Check each file in the list | |
acceptable = [] | |
for path in files: | |
name = os.path.basename(path) | |
wheel = WHEEL_INFO_RE(name) | |
# Is it for the specified distribution? | |
if wheel.group('name').lower() != distname: | |
continue | |
# Is it for this version of Python? | |
if not pyver_match(wheel.group('pyver')): | |
continue | |
# Is it for this platform? | |
if wheel.group('plat') not in ('any', get_platform()): | |
continue | |
# OK, keep version, name and path | |
version = Ver(wheel.group('ver')) | |
acceptable.append((version, path)) | |
if not acceptable: | |
raise ValueError("{}: No wheels match".format(distname)) | |
# Sort by version, in descending order (latest version first) | |
acceptable.sort(reverse=True) | |
# Do we have more than one wheel for the latest version? | |
if len(acceptable) > 1 and acceptable[0][0] == acceptable[1][0]: | |
raise ValueError("Too many wheels match") | |
# Return the "path" item in the (unique) wheel with the latest version | |
return acceptable[0][1] | |
class BadWheelFile(ValueError): | |
pass | |
class WheelFile(zipfile.ZipFile): | |
def __init__(self, name): | |
basename = os.path.basename(name) | |
self.parsed_filename = WHEEL_INFO_RE(basename) | |
if not basename.endswith('.whl') or self.parsed_filename is None: | |
raise BadWheelFile("Bad filename '%s'" % name) | |
zipfile.ZipFile.__init__(self, name, 'r') | |
wheel_meta = read_pkg_info_bytes(self.read(self.wheelinfo_name)) | |
# The base target location is either purelib or platlib | |
if wheel_meta['Root-Is-Purelib'] == 'true': | |
self.root = get_path('purelib') | |
else: | |
self.root = get_path('platlib') | |
self.destinations = {} | |
self.determine_destinations() | |
@property | |
def distinfo_name(self): | |
return "%s.dist-info" % self.parsed_filename.group('namever') | |
@property | |
def datadir_name(self): | |
return "%s.data" % self.parsed_filename.group('namever') | |
@property | |
def record_name(self): | |
return "%s/%s" % (self.distinfo_name, 'RECORD') | |
@property | |
def wheelinfo_name(self): | |
return "%s/%s" % (self.distinfo_name, 'WHEEL') | |
def determine_destinations(self): | |
for name in self.namelist(): | |
# Zip files can contain entries representing directories. | |
# These end in a '/'. | |
# We ignore these, as we create directories on demand. | |
if name.endswith('/'): | |
continue | |
# Pathnames in a zipfile namelist are always /-separated. | |
# In theory, paths could start with ./ or have other oddities | |
# but this won't happen in practical cases of well-formed wheels. | |
# We'll cover the simple case of an initial './' as it's both easy | |
# to do and more common than most other oddities. | |
if name.startswith('./'): | |
name = name[2:] | |
# Split off the base directory to identify files that are to be | |
# installed in non-root locations | |
basedir, sep, filename = name.partition('/') | |
if sep and basedir == self.datadir_name: | |
# Data file. Target destination is elsewhere | |
key, sep, filename = filename.partition('/') | |
if not sep: | |
raise BadWheelFile("Invalid filename in wheel: {}".format(name)) | |
target = get_path(key) | |
else: | |
# Normal file. Target destination is root | |
target = self.root | |
filename = name | |
# Map the actual filename from the zipfile to its intended target | |
# directory and the pathname relative to that directory. | |
dest = os.path.normpath(os.path.join(target, filename)) | |
self.destinations[name] = dest | |
def check_overwrite(self): | |
for name in self.namelist(): | |
if os.path.exists(self.destinations[name]): | |
raise ValueError("Wheel file {} would overwrite an existing file.".format(name)) | |
def install(self, dry_run=False): | |
# We're now ready to start processing the actual install. The process | |
# is as follows: | |
# 1. Put the files in their target locations. | |
# 2. Update RECORD - write a suitably modified RECORD file to | |
# reflect the actual installed paths. | |
record_data = [] | |
for name in self.namelist(): | |
dest = self.destinations[name] | |
if dry_run: | |
print("{} -> {}".format(name, dest)) | |
continue | |
source = HashingFile(self.open(name)) | |
# Skip the RECORD file | |
if name == self.record_name: | |
continue | |
ddir = os.path.dirname(dest) | |
if not os.path.isdir(ddir): | |
os.makedirs(ddir) | |
destination = open(dest, 'wb') | |
shutil.copyfileobj(source, destination) | |
destination.close() | |
reldest = os.path.relpath(dest, self.root) | |
reldest.replace(os.sep, '/') | |
record_data.append((reldest, source.digest(), source.length)) | |
source.close() | |
if dry_run: | |
return | |
with open_for_csv(self.destinations[self.record_name], 'w+') as rec: | |
writer = csv.writer(rec) | |
for reldest, digest, length in sorted(record_data): | |
writer.writerow((reldest, digest, length)) | |
writer.writerow((self.record_name, '', '')) | |
def urlsafe_b64encode(data): | |
"""urlsafe_b64encode without padding""" | |
return base64.urlsafe_b64encode(data).rstrip(b'=') | |
class HashingFile(object): | |
def __init__(self, fd, hashtype='sha256'): | |
self.fd = fd | |
self.hashtype = hashtype | |
self.hash = hashlib.new(hashtype) | |
self.length = 0 | |
def read(self, n): | |
data = self.fd.read(n) | |
self.hash.update(data) | |
self.length += len(data) | |
return data | |
def close(self): | |
self.fd.close() | |
def digest(self): | |
if self.hashtype == 'md5': | |
return self.hash.hexdigest() | |
digest = self.hash.digest() | |
return self.hashtype + '=' + native(urlsafe_b64encode(digest)) | |
def main(): | |
parser = OptionParser() | |
parser.add_option("--dry-run", dest="dry_run", | |
action="store_true", default=False, | |
help="Report which files will be written, but do not install.") | |
parser.add_option('-d', "--wheel-dir", dest="wheeldirs", | |
action="append", | |
help="Directories containing wheel files") | |
parser.add_option("--force-overwrite", dest="force_overwrite", | |
action="store_true", default=False, | |
help="Overwrite existing files at the destination. " | |
"This may corrupt your site-packages directory!") | |
options, args = parser.parse_args() | |
wheel_list = [] | |
wheeldirs = options.wheeldirs or ['.'] | |
for dirname in options.wheeldirs: | |
absdir = os.path.abspath(dirname) | |
if not os.path.isdir(absdir): | |
raise ValueError("%s is not a directory" % (dirname,)) | |
for filename in os.listdir(absdir): | |
if os.path.splitext(filename)[1] == '.whl': | |
wheel_list.append(os.path.join(absdir, filename)) | |
for filename in args: | |
if not ( | |
filename.endswith('.whl') and | |
os.path.exists(filename) and | |
not os.path.isdir(filename) | |
): | |
filename = get_best_wheel(filename, wheel_list) | |
if options.dry_run: | |
print("Installing wheel file {}".format(filename)) | |
else: | |
with WheelFile(filename) as wheel: | |
if not options.force_overwrite: | |
wheel.check_overwrite() | |
wheel.install(dry_run=options.dry_run) | |
if __name__ == '__main__': | |
try: | |
main() | |
except ValueError as err: | |
print("ERROR: {}".format(err.args[0]), file=sys.stderr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment