Skip to content

Instantly share code, notes, and snippets.

@Arachnid
Created January 13, 2009 19:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Arachnid/46573 to your computer and use it in GitHub Desktop.
Save Arachnid/46573 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
"""Converts Python packages into app-engine-friendly zips or directories."""
import logging
import optparse
import os
import re
import sys
import zipfile
class PackageReader(object):
"""Abstract base class for package-reading functionality."""
def __init__(self, path):
self.path = path
def getFiles(self, prefix):
"""Returns a list of files from the package."""
raise NotImplemented()
def isFile(self, prefix):
"""Checks if the specified file exists in the package."""
raise NotImplemented()
def isDir(self, prefix):
"""Checks if the specified directory exists in the package."""
raise NotImplemented()
def getFile(self, path):
"""Returns the bytes of the file."""
raise NotImplemented()
def copyTo(self, src, dest):
"""Copies the specified package component."""
raise NotImplemented()
def addToZip(self, src, zip):
"""Adds the specified package component to a zipfile."""
raise NotImplemented()
class DirPackageReader(PackageReader):
def __init__(self, path):
if not path.endswith('/'):
path += '/'
super(DirPackageReader, self).__init__(path)
def getFiles(self, prefix):
walkdir = os.path.join(self.path, prefix)
for dirpath, dirnames, filenames in os.walk(walkdir):
for filename in filenames:
yield os.path.join(dirpath[len(self.path):], filename)
def isFile(self, path):
path = os.path.join(self.path, path)
return os.path.isfile(path)
def isDir(self, path):
path = os.path.join(self.path, path)
return os.path.isdir(path)
def getFile(self, src):
src = os.path.join(self.path, src)
srcfile = open(src, 'rb')
data = srcfile.read()
srcfile.close()
return data
class ZipPackageReader(PackageReader):
def __init__(self, path, prefix=''):
super(ZipPackageReader, self).__init__(path)
self.zip = zipfile.ZipFile(path, 'r')
self.prefix = prefix
self.manifest = {}
self.dirs = set()
def _buildManifest(self):
if not self.manifest:
for zi in self.zip.infolist():
dir, filename = os.path.split(zi.filename)
self.manifest.setdefault(dir, set()).add(filename)
while dir:
self.dirs.add(dir)
dir, filename = os.path.split(dir)
def getFiles(self, dir):
self._buildManifest()
listing = self.manifest.get(os.path.join(self.prefix, dir), set())
return [os.path.join(dir, x) for x in listing]
def isFile(self, path):
self._buildManifest()
path = os.path.join(self.prefix, path)
dir, filename = os.path.split(path)
if dir not in self.manifest:
return False
return filename in self.manifest[dir]
def isDir(self, path):
self._buildManifest()
path = os.path.join(self.prefix, path)
return path in self.dirs
def getFile(self, src):
path = os.path.join(self.prefix, src)
return self.zip.read(path)
def getPackages(path=None):
"""Returns an iterator of PackageReader objects for the components of path."""
if not path:
path = sys.path
for loc in path:
if os.path.isdir(loc):
yield DirPackageReader(loc)
elif os.path.isfile(loc):
yield ZipPackageReader(loc)
else:
tail = 'dummy'
prefix = ''
while tail and not os.path.exists(loc):
loc, tail = os.path.split(loc)
prefix = os.path.join(tail, prefix)
if os.path.isfile(loc):
yield ZipPackageReader(loc, prefix)
def findPackage(name, path=None):
"""Finds the specified package on the path and returns it.
The return value is a (is_dir, basename, PackageReader) tuple if found, or
(None, None, None) if not found.
"""
for package in getPackages(path):
if package.isDir(name):
return (True, name, package)
elif package.isFile("%s.py" % name):
return (False, "%s.py" % name, package)
return (None, None, None)
class PackageWriter(object):
def writeFile(self, reader, src):
raise NotImplemented()
def close(self):
raise NotImplemented()
class DirPackageWriter(object):
def __init__(self, path):
self.path = path
def writeFile(self, reader, src):
outname = os.path.join(self.path, src)
outdir = os.path.dirname(outname)
if not os.path.isdir(outdir):
os.makedirs(os.path.dirname(outname))
outfile = open(outname, "wb")
outfile.write(reader.getFile(src))
outfile.close()
def close(self):
pass
class ZipPackageWriter(object):
def __init__(self, path, name):
self.zip = zipfile.ZipFile(os.path.join(path, name), "w")
def writeFile(self, reader, src):
self.zip.writestr(src, reader.getFile(src))
def close(self):
self.zip.close()
class PackagerApp(object):
ignore_paths = [
'\.py[oc]$',
'^EGG-INFO/',
'^tests/',
'\.so$',
'\.dylib$',
]
ignore_re = re.compile('|'.join(ignore_paths))
def getOptionParser(self):
parser = optparse.OptionParser(usage='%prog [options] package ... dest',
description=__doc__)
parser.add_option("-q", "--quiet", action="store_const",
const=logging.ERROR, dest="verbosity",
default=logging.WARN, help="Print errors only")
parser.add_option("-v", "--verbose", action="store_const",
const=logging.DEBUG, dest="verbosity",
help="Print everything")
parser.add_option("-u", "--nozip", action="store_true", dest="nozip",
help="Don't zip: Output a directory instead of a zipfile")
parser.add_option("-n", "--zipname", action="store", dest="zipname",
help="Select the name of the zip to create.")
return parser
def copyPackage(self, reader, writer, pkg):
for fn in reader.getFiles(pkg):
if fn.endswith('.so') or fn.endswith('.dylib'):
logging.warn(" Warning: Found native module '%s' in package '%s'. "
"Native modules are not supported in App Engine; this "
"package may not work." % (fn, pkg))
if self.ignore_re.search(fn):
logging.info(" Skipping file '%s'", fn)
else:
logging.info(" Copying file '%s'", fn)
writer.writeFile(reader, fn)
def package(self, sources, dest):
if self.options.nozip:
writer = DirPackageWriter(dest)
else:
if self.options.zipname:
writer = ZipPackageWriter(dest, self.options.zipname)
else:
writer = ZipPackageWriter(dest, '%s.zip' % sources[0])
for source in sources:
isdir, basename, reader = findPackage(source)
if reader:
logging.info("Found package '%s' at '%s'", source, reader.path)
else:
logging.error("Could not find package '%s'." % source)
return 1
if not isdir:
logging.info("Writing single-file package '%s'" % (basename,))
writer.writeFile(reader, basename)
return 0
else:
logging.info("Copying package '%s'" % (source,))
self.copyPackage(reader, writer, source)
writer.close()
return 0
def run(self, args):
self.parser = self.getOptionParser()
self.options, self.args = self.parser.parse_args(args[1:])
if len(self.args) < 2:
self.parser.print_help()
return 1
if self.options.nozip and self.options.zipname:
self.parser.error("Cannot specify both --nozip and --zipname")
logging.basicConfig(format="%(message)s", level=self.options.verbosity)
sources = self.args[:-1]
dest = self.args[-1]
return self.package(sources, dest)
def main(args):
PackagerApp().run(args)
if __name__ == '__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment