-
-
Save Cartroo/9e0e67821a0c07e594f4 to your computer and use it in GitHub Desktop.
Quick and dirty script for cloning a system-installed package into a virtualenv. It's only had limited testing so far, so don't rely on it for production use!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
"""Clone packages into a virtualenv. | |
This script should be invoked at the system level (i.e. outside the destination | |
virtualenv). The first argument should be the name of the destination virtualenv | |
and the remaining arguments specify packages to clone. | |
""" | |
import pkg_resources | |
import argparse | |
import os | |
import shutil | |
import subprocess | |
import sys | |
import time | |
class ClonePkgError(Exception): | |
pass | |
def get_arg_parser(): | |
"""Construct command-line parser.""" | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument("virtualenv", metavar="virtualenv", nargs=1, | |
help="the destination virtualenv") | |
parser.add_argument("packages", metavar="package", nargs="+", | |
help="one or more packages to clone") | |
parser.add_argument("--dry-run", dest="dryrun", action="store_true", | |
default=False, | |
help="instead of copying, dump shell commands instead") | |
return parser | |
class ProgressWriter(object): | |
def __init__(self, total, message=None, silent=False): | |
self.total = int(total) | |
self.current = 0 | |
self.written = 0 | |
self.last_write_time = 0 | |
self.silent = silent | |
if message is not None and not silent: | |
sys.stdout.write(str(message) + ": ") | |
sys.stdout.flush() | |
def __enter__(self): | |
self.current = 0 | |
return self | |
def __exit__(self, exc_type, value, traceback): | |
if self.silent: | |
return | |
if self.current < self.total: | |
msg = "incomplete" | |
else: | |
msg = "done" | |
self._refresh("%s [%d of %d]" % (msg, self.current, self.total)) | |
sys.stdout.write("\n") | |
sys.stdout.flush() | |
def update(self, current): | |
self.current = current | |
if not self.silent and int(time.time()) != self.last_write_time: | |
percent = min(99, (self.current * 100) // self.total) | |
self._refresh("%d%% [%d of %d]" % | |
(percent, self.current, self.total)) | |
self.last_write_time = int(time.time()) | |
def _refresh(self, new_text): | |
sys.stdout.write("\b" * self.written) | |
sys.stdout.write(new_text) | |
if len(new_text) < self.written: | |
sys.stdout.write(" " * (self.written - len(new_text))) | |
sys.stdout.write("\b" * (self.written - len(new_text))) | |
sys.stdout.flush() | |
self.written = len(new_text) | |
class PackageCloner(object): | |
ver_cmd = ("import sys; print str(sys.version_info.major) + '.' +" | |
"str(sys.version_info.minor)") | |
def __init__(self, virtualenv): | |
"""Check virtualenv exists and initialise cloner.""" | |
# Check for current virtualenv and abort if found. | |
current_env = os.environ.get("VIRTUAL_ENV", None) | |
if current_env is not None: | |
raise ClonePkgError("leave current virtualenv %r before invoking" | |
% (current_env,)) | |
# Check for existence of destination virtualenv | |
default_root = os.path.expanduser("~/.virtualenvs") | |
virtualenv_root = os.environ.get("WORKON_HOME", default_root) | |
self.virtualenv_path = os.path.join(virtualenv_root, virtualenv) | |
if not os.path.isdir(self.virtualenv_path): | |
raise ClonePkgError("virtualenv %r not found" | |
% (self.virtualenv_path,)) | |
# Determine source and destination python versions. | |
with open(os.devnull, "w") as null: | |
cmdline = [os.path.join(self.virtualenv_path, "bin", "python"), | |
"-c", self.ver_cmd] | |
ver_str = subprocess.check_output(cmdline, stderr=null).strip() | |
self.dst_ver_dir = "python" + ver_str | |
ver_str = ".".join((str(sys.version_info.major), | |
str(sys.version_info.minor))) | |
self.src_ver_dir = "python" + ver_str | |
# Collect list of installed packages. | |
self.system_packages = dict((p.project_name.lower(), p) | |
for p in pkg_resources.working_set) | |
def translate_src_path(self, src): | |
"""Translate a source path to corresponding dest path.""" | |
dst = None | |
# Check for the first occurrence of "lib" or "bin" and replace those | |
# with the top-level versions in the virtualenv. | |
src_items = [i for i in src.split(os.sep) if i] | |
while src_items: | |
item = src_items.pop(0) | |
if item in ("lib", "bin", "include"): | |
dst = os.path.join(self.virtualenv_path, item, | |
os.sep.join(src_items)) | |
break | |
else: | |
# Otherwise we use their path relative to the root directory, with | |
# the virtualenv root transplanted in. A prefix of "/usr" or | |
# "/usr/local" is stripped off prior to joining, if found. | |
for prefix in (os.path.join("", "usr", "local", ""), | |
os.path.join("", "usr", "")): | |
if src.startswith(prefix): | |
src = src[len(prefix):] | |
break | |
dst = os.path.join(self.virtualenv_path, src.lstrip(os.sep)) | |
# Finally, replace old with new python versions. | |
return dst.replace(self.src_ver_dir, self.dst_ver_dir) | |
def clone(self, package, dry_run=False): | |
"""Clone specified package into virtualenv.""" | |
if package.lower() not in self.system_packages: | |
raise ClonePkgError("installed package %r not found" % (package,)) | |
dist = self.system_packages[package.lower()] | |
egg_dir = dist.egg_name() + ".egg-info" | |
src_egg_path = os.path.join(dist.location, egg_dir) | |
file_list = os.path.join(src_egg_path, "installed-files.txt") | |
src_dst_files = set() | |
dst_dirs = set() | |
try: | |
with open(file_list, "r") as fd: | |
for line in fd: | |
rel_path = line.strip() | |
src = os.path.normpath(os.path.join(src_egg_path, rel_path)) | |
dst = self.translate_src_path(src) | |
if os.path.isfile(src): | |
src_dst_files.add((src, dst)) | |
dst_dirs.add(os.path.dirname(dst)) | |
elif os.path.isdir(src): | |
dst_dirs.add(dst) | |
else: | |
raise ClonePkgError("missing installed file for %r: %s" | |
% (package, src)) | |
except IOError, e: | |
raise ClonePkgError("failed to open file list for %r: %s" | |
% (package, e)) | |
# Create destination directories. | |
with ProgressWriter(len(dst_dirs), "Creating dirs", | |
silent=dry_run) as progress: | |
for i, dst in enumerate(dst_dirs): | |
if not os.path.exists(dst): | |
if dry_run: | |
print "mkdir -p '" + dst + "'" | |
else: | |
os.makedirs(dst, 0755) | |
progress.update(i + 1) | |
# Copy files across. | |
with ProgressWriter(len(src_dst_files), "Copying files", | |
silent=dry_run) as progress: | |
for i, (src, dst) in enumerate(src_dst_files): | |
self.copy_file(src, dst, dry_run) | |
progress.update(i + 1) | |
def copy_file(self, src, dst, dry_run=False): | |
"""Check for and update shebang line.""" | |
if dry_run: | |
print "cp '" + src + "' '" + dst + "'" | |
with open(src, "r") as fd: | |
first_line = fd.readline(256) | |
if (len(first_line) < 256 and first_line.startswith("#!") and | |
"bin/python" in first_line): | |
new_python = os.path.join(self.virtualenv_path, "bin", "python") | |
if dry_run: | |
print ("sed -i '1s/#!.*bin\/python/#!" + new_python + "/' " | |
+ dst) | |
else: | |
tempfile = dst + ".clonepkg-tmp" | |
with open(tempfile, "w") as out: | |
python_snippet = os.path.join("bin", "python") | |
index = first_line.find(python_snippet) | |
out.write("#!" + new_python + | |
first_line[index + len(python_snippet):]) | |
for line in fd: | |
out.write(line) | |
os.rename(tempfile, dst) | |
return | |
# If there's no shebang line to copy, just use shutil.copy(). | |
if not dry_run: | |
shutil.copy(src, dst) | |
def main(argv): | |
"""Script entry point.""" | |
parser = get_arg_parser() | |
args = parser.parse_args(args=argv[1:]) | |
try: | |
cloner = PackageCloner(args.virtualenv[0]) | |
except ClonePkgError, e: | |
print >>sys.stderr, "error: %s" % (str(e),) | |
return 1 | |
ret = 0 | |
for package in args.packages: | |
try: | |
cloner.clone(package, dry_run=args.dryrun) | |
except ClonePkgError, e: | |
print >>sys.stderr, "error cloning %r: %s" % (package, str(e)) | |
ret = 2 | |
return ret | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment