Skip to content

Instantly share code, notes, and snippets.

@Cartroo
Last active December 17, 2015 06:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Cartroo/9e0e67821a0c07e594f4 to your computer and use it in GitHub Desktop.
Save Cartroo/9e0e67821a0c07e594f4 to your computer and use it in GitHub Desktop.
Quick and dirty script for cloning a system-installed package into a virtualenv. It's only had limited testing so far, so don't rely on it for production use!
#!/usr/bin/python
"""Clone packages into a virtualenv.
This script should be invoked at the system level (i.e. outside the destination
virtualenv). The first argument should be the name of the destination virtualenv
and the remaining arguments specify packages to clone.
"""
import pkg_resources
import argparse
import os
import shutil
import subprocess
import sys
import time
class ClonePkgError(Exception):
pass
def get_arg_parser():
"""Construct command-line parser."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("virtualenv", metavar="virtualenv", nargs=1,
help="the destination virtualenv")
parser.add_argument("packages", metavar="package", nargs="+",
help="one or more packages to clone")
parser.add_argument("--dry-run", dest="dryrun", action="store_true",
default=False,
help="instead of copying, dump shell commands instead")
return parser
class ProgressWriter(object):
def __init__(self, total, message=None, silent=False):
self.total = int(total)
self.current = 0
self.written = 0
self.last_write_time = 0
self.silent = silent
if message is not None and not silent:
sys.stdout.write(str(message) + ": ")
sys.stdout.flush()
def __enter__(self):
self.current = 0
return self
def __exit__(self, exc_type, value, traceback):
if self.silent:
return
if self.current < self.total:
msg = "incomplete"
else:
msg = "done"
self._refresh("%s [%d of %d]" % (msg, self.current, self.total))
sys.stdout.write("\n")
sys.stdout.flush()
def update(self, current):
self.current = current
if not self.silent and int(time.time()) != self.last_write_time:
percent = min(99, (self.current * 100) // self.total)
self._refresh("%d%% [%d of %d]" %
(percent, self.current, self.total))
self.last_write_time = int(time.time())
def _refresh(self, new_text):
sys.stdout.write("\b" * self.written)
sys.stdout.write(new_text)
if len(new_text) < self.written:
sys.stdout.write(" " * (self.written - len(new_text)))
sys.stdout.write("\b" * (self.written - len(new_text)))
sys.stdout.flush()
self.written = len(new_text)
class PackageCloner(object):
ver_cmd = ("import sys; print str(sys.version_info.major) + '.' +"
"str(sys.version_info.minor)")
def __init__(self, virtualenv):
"""Check virtualenv exists and initialise cloner."""
# Check for current virtualenv and abort if found.
current_env = os.environ.get("VIRTUAL_ENV", None)
if current_env is not None:
raise ClonePkgError("leave current virtualenv %r before invoking"
% (current_env,))
# Check for existence of destination virtualenv
default_root = os.path.expanduser("~/.virtualenvs")
virtualenv_root = os.environ.get("WORKON_HOME", default_root)
self.virtualenv_path = os.path.join(virtualenv_root, virtualenv)
if not os.path.isdir(self.virtualenv_path):
raise ClonePkgError("virtualenv %r not found"
% (self.virtualenv_path,))
# Determine source and destination python versions.
with open(os.devnull, "w") as null:
cmdline = [os.path.join(self.virtualenv_path, "bin", "python"),
"-c", self.ver_cmd]
ver_str = subprocess.check_output(cmdline, stderr=null).strip()
self.dst_ver_dir = "python" + ver_str
ver_str = ".".join((str(sys.version_info.major),
str(sys.version_info.minor)))
self.src_ver_dir = "python" + ver_str
# Collect list of installed packages.
self.system_packages = dict((p.project_name.lower(), p)
for p in pkg_resources.working_set)
def translate_src_path(self, src):
"""Translate a source path to corresponding dest path."""
dst = None
# Check for the first occurrence of "lib" or "bin" and replace those
# with the top-level versions in the virtualenv.
src_items = [i for i in src.split(os.sep) if i]
while src_items:
item = src_items.pop(0)
if item in ("lib", "bin", "include"):
dst = os.path.join(self.virtualenv_path, item,
os.sep.join(src_items))
break
else:
# Otherwise we use their path relative to the root directory, with
# the virtualenv root transplanted in. A prefix of "/usr" or
# "/usr/local" is stripped off prior to joining, if found.
for prefix in (os.path.join("", "usr", "local", ""),
os.path.join("", "usr", "")):
if src.startswith(prefix):
src = src[len(prefix):]
break
dst = os.path.join(self.virtualenv_path, src.lstrip(os.sep))
# Finally, replace old with new python versions.
return dst.replace(self.src_ver_dir, self.dst_ver_dir)
def clone(self, package, dry_run=False):
"""Clone specified package into virtualenv."""
if package.lower() not in self.system_packages:
raise ClonePkgError("installed package %r not found" % (package,))
dist = self.system_packages[package.lower()]
egg_dir = dist.egg_name() + ".egg-info"
src_egg_path = os.path.join(dist.location, egg_dir)
file_list = os.path.join(src_egg_path, "installed-files.txt")
src_dst_files = set()
dst_dirs = set()
try:
with open(file_list, "r") as fd:
for line in fd:
rel_path = line.strip()
src = os.path.normpath(os.path.join(src_egg_path, rel_path))
dst = self.translate_src_path(src)
if os.path.isfile(src):
src_dst_files.add((src, dst))
dst_dirs.add(os.path.dirname(dst))
elif os.path.isdir(src):
dst_dirs.add(dst)
else:
raise ClonePkgError("missing installed file for %r: %s"
% (package, src))
except IOError, e:
raise ClonePkgError("failed to open file list for %r: %s"
% (package, e))
# Create destination directories.
with ProgressWriter(len(dst_dirs), "Creating dirs",
silent=dry_run) as progress:
for i, dst in enumerate(dst_dirs):
if not os.path.exists(dst):
if dry_run:
print "mkdir -p '" + dst + "'"
else:
os.makedirs(dst, 0755)
progress.update(i + 1)
# Copy files across.
with ProgressWriter(len(src_dst_files), "Copying files",
silent=dry_run) as progress:
for i, (src, dst) in enumerate(src_dst_files):
self.copy_file(src, dst, dry_run)
progress.update(i + 1)
def copy_file(self, src, dst, dry_run=False):
"""Check for and update shebang line."""
if dry_run:
print "cp '" + src + "' '" + dst + "'"
with open(src, "r") as fd:
first_line = fd.readline(256)
if (len(first_line) < 256 and first_line.startswith("#!") and
"bin/python" in first_line):
new_python = os.path.join(self.virtualenv_path, "bin", "python")
if dry_run:
print ("sed -i '1s/#!.*bin\/python/#!" + new_python + "/' "
+ dst)
else:
tempfile = dst + ".clonepkg-tmp"
with open(tempfile, "w") as out:
python_snippet = os.path.join("bin", "python")
index = first_line.find(python_snippet)
out.write("#!" + new_python +
first_line[index + len(python_snippet):])
for line in fd:
out.write(line)
os.rename(tempfile, dst)
return
# If there's no shebang line to copy, just use shutil.copy().
if not dry_run:
shutil.copy(src, dst)
def main(argv):
"""Script entry point."""
parser = get_arg_parser()
args = parser.parse_args(args=argv[1:])
try:
cloner = PackageCloner(args.virtualenv[0])
except ClonePkgError, e:
print >>sys.stderr, "error: %s" % (str(e),)
return 1
ret = 0
for package in args.packages:
try:
cloner.clone(package, dry_run=args.dryrun)
except ClonePkgError, e:
print >>sys.stderr, "error cloning %r: %s" % (package, str(e))
ret = 2
return ret
if __name__ == "__main__":
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment