Created
January 17, 2010 21:45
-
-
Save srid/279606 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
from os.path import (abspath, basename, exists, isfile, isdir, | |
pardir, join, dirname, relpath, realpath) | |
import tarfile | |
import zipfile | |
from contextlib import closing | |
from pypm.common import console | |
def unpack(file, path='.'): | |
"""Extract ``file`` under the directory ``path`` | |
Return (path to the unpacked directory, implementor used) | |
""" | |
assert isfile(file) | |
assert isdir(path) | |
for implementor in [GzipTarredFile, ZippedFile, Bzip2TarredFile]: | |
if implementor.is_valid(file): | |
with console.cd(path): | |
return [implementor(file).extract(), implementor] | |
else: | |
raise InvalidFile, 'compressed file format unknown: %s' % file | |
def pack_contents(file, d, implementor=None): | |
"""Pack the contents of directory ``d``""" | |
d = realpath(d) # avoid symlink confusion in pack_ex | |
return _pack_ex(file, os.listdir(d), d, implementor) | |
def pack_single(file, path, implementor=None): | |
"""Pack a single path (usually a directory)""" | |
path = realpath(path) # avoid symlink confusion in pack_ex | |
return _pack_ex(file, [path], dirname(path), implementor) | |
def _pack_ex(file, names, cwd, implementor=None): | |
"""Compress the paths given by ``names`` from directory ``cwd`` | |
Compressed file is named ``file`` using ``implementor`` | |
""" | |
assert isdir(cwd) | |
if exists(file): | |
console.rm(file) | |
if not implementor: implementor = GzipTarredFile | |
with console.cd(cwd): | |
relnames = [relpath(name, cwd) for name in names] | |
implementor.pack(relnames, file) | |
return file | |
class CompressedFile(object): | |
def __init__(self, filename): | |
self.filename = filename | |
class ZippedFile(CompressedFile): | |
"""A zip file""" | |
@staticmethod | |
def is_valid(filename): | |
return zipfile.is_zipfile(filename) | |
def extract(self): | |
try: | |
f = zipfile.ZipFile(self.filename, 'r') | |
try: | |
f.extractall() | |
return _possible_dir_name(f.namelist()) | |
except OSError, e: | |
if e.errno == 17: | |
# http://bugs.python.org/issue6510 | |
raise InvalidFile, e | |
# http://bugs.python.org/issue6609 | |
if sys.platform.startswith('win'): | |
if isinstance(e, WindowsError) and e.winerror == 267: | |
raise InvalidFile, ('uses Windows special name (%s)' % e) | |
raise | |
finally: | |
f.close() | |
except (zipfile.BadZipfile, zipfile.LargeZipFile), e: | |
raise InvalidFile, e | |
@classmethod | |
def pack(cls, paths, file): | |
raise NotImplementedError, 'pack: zip files not supported yet' | |
class TarredFile(CompressedFile): | |
"""A tar.gz/bz2 file""" | |
@classmethod | |
def is_valid(cls, filename): | |
try: | |
with closing(tarfile.open(filename, cls._get_mode())) as f: | |
return True | |
except tarfile.TarError: | |
return False | |
def extract(self): | |
try: | |
f = tarfile.open(self.filename, self._get_mode()) | |
try: | |
_ensure_read_write_access(f) | |
f.extractall() | |
return _possible_dir_name(f.getnames()) | |
finally: | |
f.close() | |
except tarfile.TarError, e: | |
raise InvalidFile, e | |
except IOError, e: | |
# see http://bugs.python.org/issue6584 | |
if 'CRC check failed' in str(e): | |
raise InvalidFile, e | |
else: | |
raise | |
@classmethod | |
def pack(cls, paths, file): | |
f = tarfile.open(file, cls._get_mode('w')) | |
try: | |
for path in paths: | |
assert exists(path), '"%s" does not exist' % path | |
f.add(path) | |
finally: | |
f.close() | |
def _get_mode(self): | |
"""Return the mode for this tarfile""" | |
raise NotImplementedError | |
class GzipTarredFile(TarredFile): | |
"""A tar.gz2 file""" | |
@staticmethod | |
def _get_mode(mode='r'): | |
assert mode in ['r', 'w'] | |
return mode + ':gz' | |
class Bzip2TarredFile(TarredFile): | |
"""A tar.gz2 file""" | |
@staticmethod | |
def _get_mode(mode='r'): | |
assert mode in ['r', 'w'] | |
return mode + ':bz2' | |
class InvalidFile(Exception): | |
"""The given compressed file is invalid. It cannot be properly extracted""" | |
class MultipleTopLevels(InvalidFile): | |
"""Can be extracted, but contains multiple top-level dirs""" | |
class SingleFile(InvalidFile): | |
"""Contains nothing but a single file. Compressed archived is expected to | |
contain one directory | |
""" | |
def _possible_dir_name(contents): | |
"""The directory where the the files are possibly extracted.""" | |
top_level_dirs = _find_top_level_directories(contents, sep='/') | |
if len(top_level_dirs) == 0: | |
raise InvalidFile, 'has no contents' | |
elif len(top_level_dirs) > 1: | |
raise MultipleTopLevels, 'more than one top levels: %s' % top_level_dirs | |
d = abspath(top_level_dirs[0]) | |
assert exists(d), 'missing dir: %s' % d | |
if not isdir(d): | |
# eg: http://pypi.python.org/pypi/DeferArgs/0.4 | |
raise SingleFile, 'contains a single file: %s' % d | |
return d | |
def _ensure_read_write_access(tarfileobj): | |
"""Ensure that the given tarfile will be readable and writable by the | |
user (the client program using this API) after extraction. | |
Some tarballs have u-x set on directories or u-w on files. We reset such | |
perms here.. so that the extracted files remain accessible for reading | |
and deletion as per the user's wish. | |
See also: http://bugs.python.org/issue6196 | |
""" | |
dir_perm = tarfile.TUREAD | tarfile.TUWRITE | tarfile.TUEXEC | |
file_perm = tarfile.TUREAD | tarfile.TUWRITE | |
for tarinfo in tarfileobj.getmembers(): | |
tarinfo.mode |= (dir_perm if tarinfo.isdir() else file_perm) | |
def _find_top_level_directories(fileslist, sep): | |
"""Find the distinct first components in the fileslist""" | |
toplevels = set() | |
for path in fileslist: | |
firstcomponent = path.split(sep, 1)[0] | |
toplevels.add(firstcomponent) | |
return list(toplevels) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment