Last active
April 8, 2016 09:29
-
-
Save FreakyBytes/30a6f9866154d82f1c3863f2e4969cc4 to your computer and use it in GitHub Desktop.
fixed modified Python zipfile module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
stripped down version of the original Python test support module | |
Copyright (c) 2001-2016 Python Software Foundation; All Rights Reserved | |
""" | |
# 1. This LICENSE AGREEMENT is between the Python Software Foundation ("PSF"), and | |
# the Individual or Organization ("Licensee") accessing and otherwise using Python | |
# 2.7.11 software in source or binary form and its associated documentation. | |
# | |
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby | |
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce, | |
# analyze, test, perform and/or display publicly, prepare derivative works, | |
# distribute, and otherwise use Python 2.7.11 alone or in any derivative | |
# version, provided, however, that PSF's License Agreement and PSF's notice of | |
# copyright, i.e., "Copyright (c) 2001-2016 Python Software Foundation; All Rights | |
# Reserved" are retained in Python 2.7.11 alone or in any derivative version | |
# prepared by Licensee. | |
# | |
# 3. In the event Licensee prepares a derivative work that is based on or | |
# incorporates Python 2.7.11 or any part thereof, and wants to make the | |
# derivative work available to others as provided herein, then Licensee hereby | |
# agrees to include in any such work a brief summary of the changes made to Python | |
# 2.7.11. | |
# | |
# 4. PSF is making Python 2.7.11 available to Licensee on an "AS IS" basis. | |
# PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF | |
# EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR | |
# WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE | |
# USE OF PYTHON 2.7.11 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS. | |
# | |
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.11 | |
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF | |
# MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.11, OR ANY DERIVATIVE | |
# THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. | |
# | |
# 6. This License Agreement will automatically terminate upon a material breach of | |
# its terms and conditions. | |
# | |
# 7. Nothing in this License Agreement shall be deemed to create any relationship | |
# of agency, partnership, or joint venture between PSF and Licensee. This License | |
# Agreement does not grant permission to use PSF trademarks or trade name in a | |
# trademark sense to endorse or promote products or services of Licensee, or any | |
# third party. | |
# | |
# 8. By copying, installing or otherwise using Python 2.7.11, Licensee agrees | |
# to be bound by the terms and conditions of this License Agreement. | |
import os | |
import errno | |
import unittest | |
import sys | |
# Filename used for testing | |
if os.name == 'java': | |
# Jython disallows @ in module names | |
TESTFN = '$test' | |
else: | |
TESTFN = '@test' | |
# Disambiguate TESTFN for parallel testing, while letting it remain a valid | |
# module name. | |
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) | |
verbose = 1 # Flag set to 0 by regrtest.py | |
class BasicTestRunner: | |
def run(self, test): | |
result = unittest.TestResult() | |
test(result) | |
return result | |
class Error(Exception): | |
"""Base class for regression test exceptions.""" | |
class TestFailed(Error): | |
"""Test failed.""" | |
def _run_suite(suite): | |
"""Run tests from a unittest.TestSuite-derived class.""" | |
if verbose: | |
runner = unittest.TextTestRunner(sys.stdout, verbosity=2) | |
else: | |
runner = BasicTestRunner() | |
result = runner.run(suite) | |
if not result.wasSuccessful(): | |
if len(result.errors) == 1 and not result.failures: | |
err = result.errors[0][1] | |
elif len(result.failures) == 1 and not result.errors: | |
err = result.failures[0][1] | |
else: | |
err = "multiple errors occurred" | |
if not verbose: err += "; run in verbose mode for details" | |
raise TestFailed(err) | |
def run_unittest(*classes): | |
"""Run tests from unittest.TestCase-derived classes.""" | |
valid_types = (unittest.TestSuite, unittest.TestCase) | |
suite = unittest.TestSuite() | |
for cls in classes: | |
if isinstance(cls, str): | |
if cls in sys.modules: | |
suite.addTest(unittest.findTestCases(sys.modules[cls])) | |
else: | |
raise ValueError("str arguments must be keys in sys.modules") | |
elif isinstance(cls, valid_types): | |
suite.addTest(cls) | |
else: | |
suite.addTest(unittest.makeSuite(cls)) | |
_run_suite(suite) | |
def findfile(file, here=__file__, subdir=None): | |
"""Try to find a file on sys.path and the working directory. If it is not | |
found the argument passed to the function is returned (this does not | |
necessarily signal failure; could still be the legitimate path).""" | |
if os.path.isabs(file): | |
return file | |
if subdir is not None: | |
file = os.path.join(subdir, file) | |
path = sys.path | |
path = [os.path.dirname(here)] + path | |
for dn in path: | |
fn = os.path.join(dn, file) | |
if os.path.exists(fn): return fn | |
return file | |
def unlink(filename): | |
try: | |
os.unlink(filename) | |
except OSError as error: | |
# The filename need not exist. | |
if error.errno not in (errno.ENOENT, errno.ENOTDIR): | |
raise |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Read and write ZIP files. | |
modified version of the original Python 2.7 zipfile module | |
Copyright (c) 2001-2016 Python Software Foundation; All Rights Reserved | |
""" | |
from __future__ import print_function | |
# 1. This LICENSE AGREEMENT is between the Python Software Foundation ("PSF"), and | |
# the Individual or Organization ("Licensee") accessing and otherwise using Python | |
# 2.7.11 software in source or binary form and its associated documentation. | |
# | |
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby | |
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce, | |
# analyze, test, perform and/or display publicly, prepare derivative works, | |
# distribute, and otherwise use Python 2.7.11 alone or in any derivative | |
# version, provided, however, that PSF's License Agreement and PSF's notice of | |
# copyright, i.e., "Copyright (c) 2001-2016 Python Software Foundation; All Rights | |
# Reserved" are retained in Python 2.7.11 alone or in any derivative version | |
# prepared by Licensee. | |
# | |
# 3. In the event Licensee prepares a derivative work that is based on or | |
# incorporates Python 2.7.11 or any part thereof, and wants to make the | |
# derivative work available to others as provided herein, then Licensee hereby | |
# agrees to include in any such work a brief summary of the changes made to Python | |
# 2.7.11. | |
# | |
# 4. PSF is making Python 2.7.11 available to Licensee on an "AS IS" basis. | |
# PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF | |
# EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR | |
# WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE | |
# USE OF PYTHON 2.7.11 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS. | |
# | |
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.11 | |
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF | |
# MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.11, OR ANY DERIVATIVE | |
# THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. | |
# | |
# 6. This License Agreement will automatically terminate upon a material breach of | |
# its terms and conditions. | |
# | |
# 7. Nothing in this License Agreement shall be deemed to create any relationship | |
# of agency, partnership, or joint venture between PSF and Licensee. This License | |
# Agreement does not grant permission to use PSF trademarks or trade name in a | |
# trademark sense to endorse or promote products or services of Licensee, or any | |
# third party. | |
# | |
# 8. By copying, installing or otherwise using Python 2.7.11, Licensee agrees | |
# to be bound by the terms and conditions of this License Agreement. | |
""" | |
Read and write ZIP files. | |
XXX references to utf-8 need further investigation. | |
""" | |
import io | |
import os | |
import re | |
import imp | |
import sys | |
import time | |
import stat | |
import shutil | |
import struct | |
import binascii | |
try: | |
import zlib # We may need its compression method | |
crc32 = zlib.crc32 | |
except ImportError: | |
zlib = None | |
crc32 = binascii.crc32 | |
__all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", | |
"is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"] | |
class BadZipFile(Exception): | |
pass | |
class LargeZipFile(Exception): | |
""" | |
Raised when writing a zipfile, the zipfile requires ZIP64 extensions | |
and those extensions are disabled. | |
""" | |
error = BadZipfile = BadZipFile # Pre-3.2 compatibility names | |
ZIP64_LIMIT = (1 << 31) - 1 | |
ZIP_FILECOUNT_LIMIT = 1 << 16 | |
ZIP_MAX_COMMENT = (1 << 16) - 1 | |
# constants for Zip file compression methods | |
ZIP_STORED = 0 | |
ZIP_DEFLATED = 8 | |
# Other ZIP compression methods not supported | |
# Below are some formats and associated data for reading/writing headers using | |
# the struct module. The names and structures of headers/records are those used | |
# in the PKWARE description of the ZIP file format: | |
# http://www.pkware.com/documents/casestudies/APPNOTE.TXT | |
# (URL valid as of January 2008) | |
# The "end of central directory" structure, magic number, size, and indices | |
# (section V.I in the format document) | |
structEndArchive = b"<4s4H2LH" | |
stringEndArchive = b"PK\005\006" | |
sizeEndCentDir = struct.calcsize(structEndArchive) | |
_ECD_SIGNATURE = 0 | |
_ECD_DISK_NUMBER = 1 | |
_ECD_DISK_START = 2 | |
_ECD_ENTRIES_THIS_DISK = 3 | |
_ECD_ENTRIES_TOTAL = 4 | |
_ECD_SIZE = 5 | |
_ECD_OFFSET = 6 | |
_ECD_COMMENT_SIZE = 7 | |
# These last two indices are not part of the structure as defined in the | |
# spec, but they are used internally by this module as a convenience | |
_ECD_COMMENT = 8 | |
_ECD_LOCATION = 9 | |
# The "central directory" structure, magic number, size, and indices | |
# of entries in the structure (section V.F in the format document) | |
structCentralDir = "<4s4B4HL2L5H2L" | |
stringCentralDir = b"PK\001\002" | |
sizeCentralDir = struct.calcsize(structCentralDir) | |
# indexes of entries in the central directory structure | |
_CD_SIGNATURE = 0 | |
_CD_CREATE_VERSION = 1 | |
_CD_CREATE_SYSTEM = 2 | |
_CD_EXTRACT_VERSION = 3 | |
_CD_EXTRACT_SYSTEM = 4 | |
_CD_FLAG_BITS = 5 | |
_CD_COMPRESS_TYPE = 6 | |
_CD_TIME = 7 | |
_CD_DATE = 8 | |
_CD_CRC = 9 | |
_CD_COMPRESSED_SIZE = 10 | |
_CD_UNCOMPRESSED_SIZE = 11 | |
_CD_FILENAME_LENGTH = 12 | |
_CD_EXTRA_FIELD_LENGTH = 13 | |
_CD_COMMENT_LENGTH = 14 | |
_CD_DISK_NUMBER_START = 15 | |
_CD_INTERNAL_FILE_ATTRIBUTES = 16 | |
_CD_EXTERNAL_FILE_ATTRIBUTES = 17 | |
_CD_LOCAL_HEADER_OFFSET = 18 | |
# The "local file header" structure, magic number, size, and indices | |
# (section V.A in the format document) | |
structFileHeader = "<4s2B4HL2L2H" | |
stringFileHeader = b"PK\003\004" | |
sizeFileHeader = struct.calcsize(structFileHeader) | |
_FH_SIGNATURE = 0 | |
_FH_EXTRACT_VERSION = 1 | |
_FH_EXTRACT_SYSTEM = 2 | |
_FH_GENERAL_PURPOSE_FLAG_BITS = 3 | |
_FH_COMPRESSION_METHOD = 4 | |
_FH_LAST_MOD_TIME = 5 | |
_FH_LAST_MOD_DATE = 6 | |
_FH_CRC = 7 | |
_FH_COMPRESSED_SIZE = 8 | |
_FH_UNCOMPRESSED_SIZE = 9 | |
_FH_FILENAME_LENGTH = 10 | |
_FH_EXTRA_FIELD_LENGTH = 11 | |
_FHF_HAS_DATA_DESCRIPTOR = 0x8 | |
dataDescriptorSignature = 0x08074b50 | |
# The "Zip64 end of central directory locator" structure, magic number, and size | |
structEndArchive64Locator = "<4sLQL" | |
stringEndArchive64Locator = b"PK\x06\x07" | |
sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator) | |
# The "Zip64 end of central directory" record, magic number, size, and indices | |
# (section V.G in the format document) | |
structEndArchive64 = "<4sQ2H2L4Q" | |
stringEndArchive64 = b"PK\x06\x06" | |
sizeEndCentDir64 = struct.calcsize(structEndArchive64) | |
_CD64_SIGNATURE = 0 | |
_CD64_DIRECTORY_RECSIZE = 1 | |
_CD64_CREATE_VERSION = 2 | |
_CD64_EXTRACT_VERSION = 3 | |
_CD64_DISK_NUMBER = 4 | |
_CD64_DISK_NUMBER_START = 5 | |
_CD64_NUMBER_ENTRIES_THIS_DISK = 6 | |
_CD64_NUMBER_ENTRIES_TOTAL = 7 | |
_CD64_DIRECTORY_SIZE = 8 | |
_CD64_OFFSET_START_CENTDIR = 9 | |
def _check_zipfile(fp): | |
try: | |
if _EndRecData(fp): | |
return True # file has correct magic number | |
except IOError: | |
pass | |
return False | |
def is_zipfile(filename): | |
"""Quickly see if a file is a ZIP file by checking the magic number. | |
The filename argument may be a file or file-like object too. | |
""" | |
result = False | |
try: | |
if hasattr(filename, "read"): | |
result = _check_zipfile(fp=filename) | |
else: | |
with open(filename, "rb") as fp: | |
result = _check_zipfile(fp) | |
except IOError: | |
pass | |
return result | |
def _EndRecData64(fpin, offset, endrec): | |
""" | |
Read the ZIP64 end-of-archive records and use that to update endrec | |
""" | |
try: | |
fpin.seek(offset - sizeEndCentDir64Locator, 2) | |
except IOError: | |
# If the seek fails, the file is not large enough to contain a ZIP64 | |
# end-of-archive record, so just return the end record we were given. | |
return endrec | |
data = fpin.read(sizeEndCentDir64Locator) | |
sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data) | |
if sig != stringEndArchive64Locator: | |
return endrec | |
if diskno != 0 or disks != 1: | |
raise BadZipFile("zipfiles that span multiple disks are not supported") | |
# Assume no 'zip64 extensible data' | |
fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2) | |
data = fpin.read(sizeEndCentDir64) | |
sig, sz, create_version, read_version, disk_num, disk_dir, \ | |
dircount, dircount2, dirsize, diroffset = \ | |
struct.unpack(structEndArchive64, data) | |
if sig != stringEndArchive64: | |
return endrec | |
# Update the original endrec using data from the ZIP64 record | |
endrec[_ECD_SIGNATURE] = sig | |
endrec[_ECD_DISK_NUMBER] = disk_num | |
endrec[_ECD_DISK_START] = disk_dir | |
endrec[_ECD_ENTRIES_THIS_DISK] = dircount | |
endrec[_ECD_ENTRIES_TOTAL] = dircount2 | |
endrec[_ECD_SIZE] = dirsize | |
endrec[_ECD_OFFSET] = diroffset | |
return endrec | |
def _EndRecData(fpin): | |
"""Return data from the "End of Central Directory" record, or None. | |
The data is a list of the nine items in the ZIP "End of central dir" | |
record followed by a tenth item, the file seek offset of this record.""" | |
# Determine file size | |
fpin.seek(0, 2) | |
filesize = fpin.tell() | |
# Check to see if this is ZIP file with no archive comment (the | |
# "end of central directory" structure should be the last item in the | |
# file if this is the case). | |
try: | |
fpin.seek(-sizeEndCentDir, 2) | |
except IOError: | |
return None | |
data = fpin.read() | |
if data[0:4] == stringEndArchive and data[-2:] == b"\000\000": | |
# the signature is correct and there's no comment, unpack structure | |
endrec = struct.unpack(structEndArchive, data) | |
endrec=list(endrec) | |
# Append a blank comment and record start offset | |
endrec.append(b"") | |
endrec.append(filesize - sizeEndCentDir) | |
# Try to read the "Zip64 end of central directory" structure | |
return _EndRecData64(fpin, -sizeEndCentDir, endrec) | |
# Either this is not a ZIP file, or it is a ZIP file with an archive | |
# comment. Search the end of the file for the "end of central directory" | |
# record signature. The comment is the last item in the ZIP file and may be | |
# up to 64K long. It is assumed that the "end of central directory" magic | |
# number does not appear in the comment. | |
maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0) | |
fpin.seek(maxCommentStart, 0) | |
data = fpin.read() | |
start = data.rfind(stringEndArchive) | |
if start >= 0: | |
# found the magic number; attempt to unpack and interpret | |
recData = data[start:start+sizeEndCentDir] | |
endrec = list(struct.unpack(structEndArchive, recData)) | |
comment = data[start+sizeEndCentDir:] | |
# check that comment length is correct | |
if endrec[_ECD_COMMENT_SIZE] == len(comment): | |
# Append the archive comment and start offset | |
endrec.append(comment) | |
endrec.append(maxCommentStart + start) | |
# Try to read the "Zip64 end of central directory" structure | |
return _EndRecData64(fpin, maxCommentStart + start - filesize, | |
endrec) | |
# Unable to find a valid end of central directory structure | |
return | |
class ZipInfo (object): | |
"""Class with attributes describing each file in the ZIP archive.""" | |
__slots__ = ( | |
'orig_filename', | |
'filename', | |
'date_time', | |
'compress_type', | |
'comment', | |
'extra', | |
'create_system', | |
'create_version', | |
'extract_version', | |
'reserved', | |
'flag_bits', | |
'volume', | |
'internal_attr', | |
'external_attr', | |
'header_offset', | |
'CRC', | |
'compress_size', | |
'file_size', | |
'_raw_time', | |
) | |
def __init__(self, filename="NoName", date_time=(1980,1,1,0,0,0)): | |
self.orig_filename = filename # Original file name in archive | |
# Terminate the file name at the first null byte. Null bytes in file | |
# names are used as tricks by viruses in archives. | |
null_byte = filename.find(chr(0)) | |
if null_byte >= 0: | |
filename = filename[0:null_byte] | |
# This is used to ensure paths in generated ZIP files always use | |
# forward slashes as the directory separator, as required by the | |
# ZIP format specification. | |
if os.sep != "/" and os.sep in filename: | |
filename = filename.replace(os.sep, "/") | |
self.filename = filename # Normalized file name | |
self.date_time = date_time # year, month, day, hour, min, sec | |
# Standard values: | |
self.compress_type = ZIP_STORED # Type of compression for the file | |
self.comment = b"" # Comment for each file | |
self.extra = b"" # ZIP extra data | |
if sys.platform == 'win32': | |
self.create_system = 0 # System which created ZIP archive | |
else: | |
# Assume everything else is unix-y | |
self.create_system = 3 # System which created ZIP archive | |
self.create_version = 20 # Version which created ZIP archive | |
self.extract_version = 20 # Version needed to extract archive | |
self.reserved = 0 # Must be zero | |
self.flag_bits = 0 # ZIP flag bits | |
self.volume = 0 # Volume number of file header | |
self.internal_attr = 0 # Internal attributes | |
self.external_attr = 0 # External file attributes | |
# Other attributes are set by class ZipFile: | |
# header_offset Byte offset to the file header | |
# CRC CRC-32 of the uncompressed file | |
# compress_size Size of the compressed file | |
# file_size Size of the uncompressed file | |
def FileHeader(self): | |
"""Return the per-file header as a string.""" | |
dt = self.date_time | |
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] | |
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) | |
if self.flag_bits & _FHF_HAS_DATA_DESCRIPTOR: | |
# Set these to zero because we write them after the file data | |
CRC = compress_size = file_size = 0 | |
else: | |
CRC = self.CRC | |
compress_size = self.compress_size | |
file_size = self.file_size | |
extra = self.extra | |
if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT: | |
# File is larger than what fits into a 4 byte integer, | |
# fall back to the ZIP64 extension | |
fmt = '<HHQQ' | |
extra = extra + struct.pack(fmt, | |
1, struct.calcsize(fmt)-4, file_size, compress_size) | |
file_size = 0xffffffff | |
compress_size = 0xffffffff | |
self.extract_version = max(45, self.extract_version) | |
self.create_version = max(45, self.extract_version) | |
filename, flag_bits = self._encodeFilenameFlags() | |
header = struct.pack(structFileHeader, stringFileHeader, | |
self.extract_version, self.reserved, flag_bits, | |
self.compress_type, dostime, dosdate, CRC, | |
compress_size, file_size, | |
len(filename), len(extra)) | |
return header + filename + extra | |
def _encodeFilenameFlags(self): | |
if sys.version_info[0] >= 3 or isinstance(self.filename, unicode): | |
try: | |
return self.filename.encode('ascii'), self.flag_bits | |
except UnicodeEncodeError: | |
return self.filename.encode('utf-8'), self.flag_bits | 0x800 | |
else: | |
return self.filename, self.flag_bits | |
def _decodeExtra(self): | |
# Try to decode the extra field. | |
extra = self.extra | |
unpack = struct.unpack | |
while extra: | |
tp, ln = unpack('<HH', extra[:4]) | |
if tp == 1: | |
if ln >= 24: | |
counts = unpack('<QQQ', extra[4:28]) | |
elif ln == 16: | |
counts = unpack('<QQ', extra[4:20]) | |
elif ln == 8: | |
counts = unpack('<Q', extra[4:12]) | |
elif ln == 0: | |
counts = () | |
else: | |
raise RuntimeError("Corrupt extra field %s"%(ln,)) | |
idx = 0 | |
# ZIP64 extension (large files and/or large archives) | |
if self.file_size in (0xffffffffffffffff, 0xffffffff): | |
self.file_size = counts[idx] | |
idx += 1 | |
if self.compress_size == 0xFFFFFFFF: | |
self.compress_size = counts[idx] | |
idx += 1 | |
if self.header_offset == 0xffffffff: | |
old = self.header_offset | |
self.header_offset = counts[idx] | |
idx+=1 | |
extra = extra[ln+4:] | |
class _ZipDecrypter: | |
"""Class to handle decryption of files stored within a ZIP archive. | |
ZIP supports a password-based form of encryption. Even though known | |
plaintext attacks have been found against it, it is still useful | |
to be able to get data out of such a file. | |
Usage: | |
zd = _ZipDecrypter(mypwd) | |
plain_char = zd(cypher_char) | |
plain_text = map(zd, cypher_text) | |
""" | |
def _GenerateCRCTable(): | |
"""Generate a CRC-32 table. | |
ZIP encryption uses the CRC32 one-byte primitive for scrambling some | |
internal keys. We noticed that a direct implementation is faster than | |
relying on binascii.crc32(). | |
""" | |
poly = 0xedb88320 | |
table = [0] * 256 | |
for i in range(256): | |
crc = i | |
for j in range(8): | |
if crc & 1: | |
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly | |
else: | |
crc = ((crc >> 1) & 0x7FFFFFFF) | |
table[i] = crc | |
return table | |
crctable = _GenerateCRCTable() | |
def _crc32(self, ch, crc): | |
"""Compute the CRC32 primitive on one byte.""" | |
if isinstance(ch, (str, unicode)): | |
ch = ord(ch) | |
return ((crc >> 8) & 0xffffff) ^ self.crctable[(crc ^ ch) & 0xff] | |
def __init__(self, pwd): | |
self.key0 = 305419896 | |
self.key1 = 591751049 | |
self.key2 = 878082192 | |
for p in pwd: | |
self._UpdateKeys(p) | |
def _UpdateKeys(self, c): | |
self.key0 = self._crc32(c, self.key0) | |
self.key1 = (self.key1 + (self.key0 & 255)) & 4294967295 | |
self.key1 = (self.key1 * 134775813 + 1) & 4294967295 | |
self.key2 = self._crc32((self.key1 >> 24) & 255, self.key2) | |
def __call__(self, c): | |
"""Decrypt a single character.""" | |
if isinstance(c, (str, unicode)): | |
c = ord(c) | |
k = self.key2 | 2 | |
c = c ^ (((k * (k^1)) >> 8) & 255) | |
self._UpdateKeys(c) | |
return c | |
class ZipExtFile(io.BufferedIOBase): | |
"""File-like object for reading an archive member. | |
Is returned by ZipFile.open(). | |
""" | |
# Max size supported by decompressor. | |
MAX_N = 1 << 31 - 1 | |
# Read from compressed files in 4k blocks. | |
MIN_READ_SIZE = 4096 | |
# Search for universal newlines or line chunks. | |
PATTERN = re.compile(br'^(?P<chunk>[^\r\n]+)|(?P<newline>\n|\r\n?)') | |
def __init__(self, fileobj, mode, zipinfo, decrypter=None, | |
close_fileobj=False): | |
self._fileobj = fileobj | |
self._decrypter = decrypter | |
self._close_fileobj = close_fileobj | |
self._compress_type = zipinfo.compress_type | |
self._compress_size = zipinfo.compress_size | |
self._compress_left = zipinfo.compress_size | |
if self._compress_type == ZIP_DEFLATED: | |
self._decompressor = zlib.decompressobj(-15) | |
self._unconsumed = b'' | |
self._readbuffer = b'' | |
self._offset = 0 | |
self._universal = 'U' in mode | |
self.newlines = None | |
# Adjust read size for encrypted files since the first 12 bytes | |
# are for the encryption/password information. | |
if self._decrypter is not None: | |
self._compress_left -= 12 | |
self.mode = mode | |
self.name = zipinfo.filename | |
if hasattr(zipinfo, 'CRC'): | |
self._expected_crc = zipinfo.CRC | |
self._running_crc = crc32(b'') & 0xffffffff | |
else: | |
self._expected_crc = None | |
def readline(self, limit=-1): | |
"""Read and return a line from the stream. | |
If limit is specified, at most limit bytes will be read. | |
""" | |
if not self._universal and limit < 0: | |
# Shortcut common case - newline found in buffer. | |
i = self._readbuffer.find(b'\n', self._offset) + 1 | |
if i > 0: | |
line = self._readbuffer[self._offset: i] | |
self._offset = i | |
return line | |
if not self._universal: | |
return io.BufferedIOBase.readline(self, limit) | |
line = b'' | |
while limit < 0 or len(line) < limit: | |
readahead = self.peek(2) | |
if readahead == b'': | |
return line | |
# | |
# Search for universal newlines or line chunks. | |
# | |
# The pattern returns either a line chunk or a newline, but not | |
# both. Combined with peek(2), we are assured that the sequence | |
# '\r\n' is always retrieved completely and never split into | |
# separate newlines - '\r', '\n' due to coincidental readaheads. | |
# | |
match = self.PATTERN.search(readahead) | |
newline = match.group('newline') | |
if newline is not None: | |
if self.newlines is None: | |
self.newlines = [] | |
if newline not in self.newlines: | |
self.newlines.append(newline) | |
self._offset += len(newline) | |
return line + b'\n' | |
chunk = match.group('chunk') | |
if limit >= 0: | |
chunk = chunk[: limit - len(line)] | |
self._offset += len(chunk) | |
line += chunk | |
return line | |
def peek(self, n=1): | |
"""Returns buffered bytes without advancing the position.""" | |
if n > len(self._readbuffer) - self._offset: | |
chunk = self.read(n) | |
self._offset -= len(chunk) | |
# Return up to 512 bytes to reduce allocation overhead for tight loops. | |
return self._readbuffer[self._offset: self._offset + 512] | |
def readable(self): | |
return True | |
def read(self, n=-1): | |
"""Read and return up to n bytes. | |
If the argument is omitted, None, or negative, data is read and returned until EOF is reached.. | |
""" | |
buf = b'' | |
if n is None: | |
n = -1 | |
while True: | |
if n < 0: | |
data = self.read1(n) | |
elif n > len(buf): | |
data = self.read1(n - len(buf)) | |
else: | |
return buf | |
if len(data) == 0: | |
return buf | |
buf += data | |
def _update_crc(self, newdata, eof): | |
# Update the CRC using the given data. | |
if self._expected_crc is None: | |
# No need to compute the CRC if we don't have a reference value | |
return | |
self._running_crc = crc32(newdata, self._running_crc) & 0xffffffff | |
# Check the CRC if we're at the end of the file | |
if eof and self._running_crc != self._expected_crc: | |
raise BadZipFile("Bad CRC-32 for file %r" % self.name) | |
def read1(self, n): | |
"""Read up to n bytes with at most one read() system call.""" | |
# Simplify algorithm (branching) by transforming negative n to large n. | |
if n < 0 or n is None: | |
n = self.MAX_N | |
# Bytes available in read buffer. | |
len_readbuffer = len(self._readbuffer) - self._offset | |
# Read from file. | |
if self._compress_left > 0 and n > len_readbuffer + len(self._unconsumed): | |
nbytes = n - len_readbuffer - len(self._unconsumed) | |
nbytes = max(nbytes, self.MIN_READ_SIZE) | |
nbytes = min(nbytes, self._compress_left) | |
data = self._fileobj.read(nbytes) | |
self._compress_left -= len(data) | |
if data and self._decrypter is not None: | |
data = ''.join(chr(i) for i in map(self._decrypter, data)) | |
if self._compress_type == ZIP_STORED: | |
self._update_crc(data, eof=(self._compress_left==0)) | |
self._readbuffer = self._readbuffer[self._offset:] + data | |
self._offset = 0 | |
else: | |
# Prepare deflated bytes for decompression. | |
self._unconsumed += data | |
# Handle unconsumed data. | |
if (len(self._unconsumed) > 0 and n > len_readbuffer and | |
self._compress_type == ZIP_DEFLATED): | |
data = self._decompressor.decompress( | |
self._unconsumed, | |
max(n - len_readbuffer, self.MIN_READ_SIZE) | |
) | |
self._unconsumed = self._decompressor.unconsumed_tail | |
eof = len(self._unconsumed) == 0 and self._compress_left == 0 | |
if eof: | |
data += self._decompressor.flush() | |
self._update_crc(data, eof=eof) | |
self._readbuffer = self._readbuffer[self._offset:] + data | |
self._offset = 0 | |
# Read from buffer. | |
data = self._readbuffer[self._offset: self._offset + n] | |
self._offset += len(data) | |
return data | |
def close(self): | |
try: | |
if self._close_fileobj: | |
self._fileobj.close() | |
finally: | |
super(ZipExtFile, self).close() | |
class ZipFile(object): | |
""" Class with methods to open, read, write, remove, close, list zip files. | |
z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=False) | |
file: Either the path to the file, or a file-like object. | |
If it is a path, the file will be opened and closed by ZipFile. | |
mode: The mode can be either read "r", write "w" or append "a". | |
compression: ZIP_STORED (no compression) or ZIP_DEFLATED (requires zlib). | |
allowZip64: if True ZipFile will create files with ZIP64 extensions when | |
needed, otherwise it will raise an exception when this would | |
be necessary. | |
""" | |
fp = None # Set here since __del__ checks it | |
def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False): | |
"""Open the ZIP file with mode read "r", write "w" or append "a".""" | |
if mode not in ("r", "w", "a"): | |
raise RuntimeError('ZipFile() requires mode "r", "w", or "a"') | |
if compression == ZIP_STORED: | |
pass | |
elif compression == ZIP_DEFLATED: | |
if not zlib: | |
raise RuntimeError( | |
"Compression requires the (missing) zlib module") | |
else: | |
raise RuntimeError("That compression method is not supported") | |
self._allowZip64 = allowZip64 | |
self._didModify = False | |
self.debug = 0 # Level of printing: 0 through 3 | |
self.NameToInfo = {} # Find file info given name | |
self.filelist = [] # List of ZipInfo instances for archive | |
self.compression = compression # Method of compression | |
self.mode = key = mode.replace('b', '')[0] | |
self.pwd = None | |
self.comment = b'' | |
# Check if we were passed a file-like object | |
if isinstance(file, str): | |
# No, it's a filename | |
self._filePassed = 0 | |
self.filename = file | |
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'} | |
try: | |
self.fp = io.open(file, modeDict[mode]) | |
except IOError: | |
if mode == 'a': | |
mode = key = 'w' | |
self.fp = io.open(file, modeDict[mode]) | |
else: | |
raise | |
else: | |
self._filePassed = 1 | |
self.fp = file | |
self.filename = getattr(file, 'name', None) | |
if key == 'r': | |
self._GetContents() | |
elif key == 'w': | |
# set the modified flag so central directory gets written | |
# even if no files are added to the archive | |
self._didModify = True | |
elif key == 'a': | |
try: | |
# See if file is a zip file | |
self._RealGetContents() | |
# seek to start of directory and overwrite | |
self.fp.seek(self.start_dir, 0) | |
except BadZipFile: | |
# file is not a zip file, just append | |
self.fp.seek(0, 2) | |
# set the modified flag so central directory gets written | |
# even if no files are added to the archive | |
self._didModify = True | |
else: | |
if not self._filePassed: | |
self.fp.close() | |
self.fp = None | |
raise RuntimeError('Mode must be "r", "w" or "a"') | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
self.close() | |
def _GetContents(self): | |
"""Read the directory, making sure we close the file if the format | |
is bad.""" | |
try: | |
self._RealGetContents() | |
except BadZipFile: | |
if not self._filePassed: | |
self.fp.close() | |
self.fp = None | |
raise | |
def _RealGetContents(self): | |
"""Read in the table of contents for the ZIP file.""" | |
fp = self.fp | |
try: | |
endrec = _EndRecData(fp) | |
except IOError: | |
raise BadZipFile("File is not a zip file") | |
if not endrec: | |
raise BadZipFile("File is not a zip file") | |
if self.debug > 1: | |
print(endrec) | |
size_cd = endrec[_ECD_SIZE] # bytes in central directory | |
offset_cd = endrec[_ECD_OFFSET] # offset of central directory | |
self.comment = endrec[_ECD_COMMENT] # archive comment | |
# "concat" is zero, unless zip was concatenated to another file | |
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd | |
if endrec[_ECD_SIGNATURE] == stringEndArchive64: | |
# If Zip64 extension structures are present, account for them | |
concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) | |
if self.debug > 2: | |
inferred = concat + offset_cd | |
print(u"given, inferred, offset", offset_cd, inferred, concat) | |
# self.start_dir: Position of start of central directory | |
self.start_dir = offset_cd + concat | |
fp.seek(self.start_dir, 0) | |
data = fp.read(size_cd) | |
fp = io.BytesIO(data) | |
total = 0 | |
while total < size_cd: | |
centdir = fp.read(sizeCentralDir) | |
if centdir[0:4] != stringCentralDir: | |
raise BadZipFile("Bad magic number for central directory") | |
centdir = struct.unpack(structCentralDir, centdir) | |
if self.debug > 2: | |
print(centdir) | |
filename = fp.read(centdir[_CD_FILENAME_LENGTH]) | |
flags = centdir[5] | |
if flags & 0x800: | |
# UTF-8 file names extension | |
filename = filename.decode('utf-8') | |
else: | |
# Historical ZIP filename encoding | |
filename = filename.decode('cp437') | |
# Create ZipInfo instance to store file information | |
x = ZipInfo(filename) | |
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) | |
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) | |
x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] | |
(x.create_version, x.create_system, x.extract_version, x.reserved, | |
x.flag_bits, x.compress_type, t, d, | |
x.CRC, x.compress_size, x.file_size) = centdir[1:12] | |
x.volume, x.internal_attr, x.external_attr = centdir[15:18] | |
# Convert date/time code to (year, month, day, hour, min, sec) | |
x._raw_time = t | |
x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F, | |
t>>11, (t>>5)&0x3F, (t&0x1F) * 2 ) | |
x._decodeExtra() | |
x.header_offset = x.header_offset + concat | |
self.filelist.append(x) | |
self.NameToInfo[x.filename] = x | |
# update total bytes read from central directory | |
total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] | |
+ centdir[_CD_EXTRA_FIELD_LENGTH] | |
+ centdir[_CD_COMMENT_LENGTH]) | |
if self.debug > 2: | |
print(u"total", total) | |
def namelist(self): | |
"""Return a list of file names in the archive.""" | |
l = [] | |
for data in self.filelist: | |
l.append(data.filename) | |
return l | |
def infolist(self): | |
"""Return a list of class ZipInfo instances for files in the | |
archive.""" | |
return self.filelist | |
def printdir(self, file=None): | |
"""Print a table of contents for the zip file.""" | |
print(u"%-46s %19s %12s" % (u"File Name", u"Modified ", u"Size"), | |
file=file) | |
for zinfo in self.filelist: | |
date = u"%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6] | |
print(u"%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size), | |
file=file) | |
def testzip(self): | |
"""Read all the files and check the CRC.""" | |
chunk_size = 2 ** 20 | |
for zinfo in self.filelist: | |
try: | |
# Read by chunks, to avoid an OverflowError or a | |
# MemoryError with very large embedded files. | |
f = self.open(zinfo.filename, "r") | |
while f.read(chunk_size): # Check CRC-32 | |
pass | |
except BadZipFile: | |
return zinfo.filename | |
def getinfo(self, name): | |
"""Return the instance of ZipInfo given 'name'.""" | |
info = self.NameToInfo.get(name) | |
if info is None: | |
raise KeyError( | |
'There is no item named %r in the archive' % name) | |
return info | |
def setpassword(self, pwd): | |
"""Set default password for encrypted files.""" | |
if pwd and not isinstance(pwd, bytes): | |
raise TypeError("pwd: expected bytes, got %s" % type(pwd)) | |
if pwd: | |
self.pwd = pwd | |
else: | |
self.pwd = None | |
def read(self, name, pwd=None): | |
"""Return file bytes (as a string) for name.""" | |
with self.open(name, "r", pwd) as fp: | |
return fp.read() | |
def open(self, name, mode="r", pwd=None): | |
"""Return file-like object for 'name'.""" | |
if mode not in ("r", "U", "rU"): | |
raise RuntimeError('open() requires mode "r", "U", or "rU"') | |
if pwd and not isinstance(pwd, bytes): | |
raise TypeError("pwd: expected bytes, got %s" % type(pwd)) | |
if not self.fp: | |
raise RuntimeError( | |
"Attempt to read ZIP archive that was already closed") | |
# Only open a new file for instances where we were not | |
# given a file object in the constructor | |
if self._filePassed: | |
zef_file = self.fp | |
else: | |
zef_file = io.open(self.filename, 'rb') | |
# Make sure we have an info object | |
if isinstance(name, ZipInfo): | |
# 'name' is already an info object | |
zinfo = name | |
else: | |
# Get info object for name | |
try: | |
zinfo = self.getinfo(name) | |
except KeyError: | |
if not self._filePassed: | |
zef_file.close() | |
raise | |
zef_file.seek(zinfo.header_offset, 0) | |
# Skip the file header: | |
fheader = zef_file.read(sizeFileHeader) | |
if fheader[0:4] != stringFileHeader: | |
raise BadZipFile("Bad magic number for file header") | |
fheader = struct.unpack(structFileHeader, fheader) | |
fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) | |
if fheader[_FH_EXTRA_FIELD_LENGTH]: | |
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) | |
if zinfo.flag_bits & 0x800: | |
# UTF-8 filename | |
fname_str = fname.decode("utf-8") | |
else: | |
fname_str = fname.decode("cp437") | |
if fname_str != zinfo.orig_filename: | |
if not self._filePassed: | |
zef_file.close() | |
raise BadZipFile( | |
'File name in directory %r and header %r differ.' | |
% (zinfo.orig_filename, fname)) | |
# check for encrypted flag & handle password | |
is_encrypted = zinfo.flag_bits & 0x1 | |
zd = None | |
if is_encrypted: | |
if not pwd: | |
pwd = self.pwd | |
if not pwd: | |
if not self._filePassed: | |
zef_file.close() | |
raise RuntimeError("File %s is encrypted, " | |
"password required for extraction" % name) | |
zd = _ZipDecrypter(pwd) | |
# The first 12 bytes in the cypher stream is an encryption header | |
# used to strengthen the algorithm. The first 11 bytes are | |
# completely random, while the 12th contains the MSB of the CRC, | |
# or the MSB of the file time depending on the header type | |
# and is used to check the correctness of the password. | |
header = zef_file.read(12) | |
h = list(map(zd, header[0:12])) | |
if zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR: | |
# compare against the file type from extended local headers | |
check_byte = (zinfo._raw_time >> 8) & 0xff | |
else: | |
# compare against the CRC otherwise | |
check_byte = (zinfo.CRC >> 24) & 0xff | |
if h[11] != check_byte: | |
if not self._filePassed: | |
zef_file.close() | |
raise RuntimeError("Bad password for file", name) | |
return ZipExtFile(zef_file, mode, zinfo, zd, | |
close_fileobj=not self._filePassed) | |
def extract(self, member, path=None, pwd=None): | |
"""Extract a member from the archive to the current working directory, | |
using its full name. Its file information is extracted as accurately | |
as possible. `member' may be a filename or a ZipInfo object. You can | |
specify a different directory using `path'. | |
""" | |
if not isinstance(member, ZipInfo): | |
member = self.getinfo(member) | |
if path is None: | |
path = os.getcwd() | |
return self._extract_member(member, path, pwd) | |
def extractall(self, path=None, members=None, pwd=None): | |
"""Extract all members from the archive to the current working | |
directory. `path' specifies a different directory to extract to. | |
`members' is optional and must be a subset of the list returned | |
by namelist(). | |
""" | |
if members is None: | |
members = self.namelist() | |
for zipinfo in members: | |
self.extract(zipinfo, path, pwd) | |
def _extract_member(self, member, targetpath, pwd): | |
"""Extract the ZipInfo object 'member' to a physical | |
file on the path targetpath. | |
""" | |
# build the destination pathname, replacing | |
# forward slashes to platform specific separators. | |
# Strip trailing path separator, unless it represents the root. | |
if (targetpath[-1:] in (os.path.sep, os.path.altsep) | |
and len(os.path.splitdrive(targetpath)[1]) > 1): | |
targetpath = targetpath[:-1] | |
# don't include leading "/" from file name if present | |
if member.filename[0] == '/': | |
targetpath = os.path.join(targetpath, member.filename[1:]) | |
else: | |
targetpath = os.path.join(targetpath, member.filename) | |
targetpath = os.path.normpath(targetpath) | |
# Create all upper directories if necessary. | |
upperdirs = os.path.dirname(targetpath) | |
if upperdirs and not os.path.exists(upperdirs): | |
os.makedirs(upperdirs) | |
if member.filename[-1] == '/': | |
if not os.path.isdir(targetpath): | |
os.mkdir(targetpath) | |
return targetpath | |
source = self.open(member, pwd=pwd) | |
target = open(targetpath, "wb") | |
shutil.copyfileobj(source, target) | |
source.close() | |
target.close() | |
return targetpath | |
def _writecheck(self, zinfo): | |
"""Check for errors before writing a file to the archive.""" | |
if zinfo.filename in self.NameToInfo: | |
if self.debug: # Warning for duplicate names | |
print(u"Duplicate name:", zinfo.filename) | |
if self.mode not in ("w", "a"): | |
raise RuntimeError('write() requires mode "w" or "a"') | |
if not self.fp: | |
raise RuntimeError( | |
"Attempt to write ZIP archive that was already closed") | |
if zinfo.compress_type == ZIP_DEFLATED and not zlib: | |
raise RuntimeError( | |
"Compression requires the (missing) zlib module") | |
if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED): | |
raise RuntimeError("That compression method is not supported") | |
if zinfo.file_size > ZIP64_LIMIT: | |
if not self._allowZip64: | |
raise LargeZipFile("Filesize would require ZIP64 extensions") | |
if zinfo.header_offset > ZIP64_LIMIT: | |
if not self._allowZip64: | |
raise LargeZipFile( | |
"Zipfile size would require ZIP64 extensions") | |
def write(self, filename, arcname=None, compress_type=None): | |
"""Put the bytes from filename into the archive under the name | |
arcname.""" | |
if not self.fp: | |
raise RuntimeError( | |
"Attempt to write to ZIP archive that was already closed") | |
st = os.stat(filename) | |
isdir = stat.S_ISDIR(st.st_mode) | |
mtime = time.localtime(st.st_mtime) | |
date_time = mtime[0:6] | |
# Create ZipInfo instance to store file information | |
if arcname is None: | |
arcname = filename | |
arcname = os.path.normpath(os.path.splitdrive(arcname)[1]) | |
while arcname[0] in (os.sep, os.altsep): | |
arcname = arcname[1:] | |
if isdir: | |
arcname += '/' | |
zinfo = ZipInfo(arcname, date_time) | |
zinfo.external_attr = (st[0] & 0xFFFF) << 16 # Unix attributes | |
if compress_type is None: | |
zinfo.compress_type = self.compression | |
else: | |
zinfo.compress_type = compress_type | |
zinfo.file_size = st.st_size | |
zinfo.flag_bits = 0x00 | |
zinfo.header_offset = self.fp.tell() # Start of header bytes | |
self._writecheck(zinfo) | |
self._didModify = True | |
if isdir: | |
zinfo.file_size = 0 | |
zinfo.compress_size = 0 | |
zinfo.CRC = 0 | |
self.filelist.append(zinfo) | |
self.NameToInfo[zinfo.filename] = zinfo | |
self.fp.write(zinfo.FileHeader()) | |
return | |
with open(filename, "rb") as fp: | |
# Must overwrite CRC and sizes with correct data later | |
zinfo.CRC = CRC = 0 | |
zinfo.compress_size = compress_size = 0 | |
zinfo.file_size = file_size = 0 | |
self.fp.write(zinfo.FileHeader()) | |
if zinfo.compress_type == ZIP_DEFLATED: | |
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, | |
zlib.DEFLATED, -15) | |
else: | |
cmpr = None | |
while 1: | |
buf = fp.read(1024 * 8) | |
if not buf: | |
break | |
file_size = file_size + len(buf) | |
CRC = crc32(buf, CRC) & 0xffffffff | |
if cmpr: | |
buf = cmpr.compress(buf) | |
compress_size = compress_size + len(buf) | |
self.fp.write(buf) | |
if cmpr: | |
buf = cmpr.flush() | |
compress_size = compress_size + len(buf) | |
self.fp.write(buf) | |
zinfo.compress_size = compress_size | |
else: | |
zinfo.compress_size = file_size | |
zinfo.CRC = CRC | |
zinfo.file_size = file_size | |
# Seek backwards and write CRC and file sizes | |
position = self.fp.tell() # Preserve current position in file | |
self.fp.seek(zinfo.header_offset + 14, 0) | |
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, | |
zinfo.file_size)) | |
self.fp.seek(position, 0) | |
self.filelist.append(zinfo) | |
self.NameToInfo[zinfo.filename] = zinfo | |
def writestr(self, zinfo_or_arcname, data, compress_type=None): | |
"""Write a file into the archive. The contents is 'data', which | |
may be either a 'str' or a 'bytes' instance; if it is a 'str', | |
it is encoded as UTF-8 first. | |
'zinfo_or_arcname' is either a ZipInfo instance or | |
the name of the file in the archive.""" | |
if isinstance(data, str) and sys.version_info[0] >= 3: | |
data = data.encode("utf-8") | |
if not isinstance(zinfo_or_arcname, ZipInfo): | |
zinfo = ZipInfo(filename=zinfo_or_arcname, | |
date_time=time.localtime(time.time())[:6]) | |
zinfo.compress_type = self.compression | |
zinfo.external_attr = 0o600 << 16 | |
else: | |
zinfo = zinfo_or_arcname | |
if not self.fp: | |
raise RuntimeError( | |
"Attempt to write to ZIP archive that was already closed") | |
zinfo.file_size = len(data) # Uncompressed size | |
zinfo.header_offset = self.fp.tell() # Start of header data | |
if compress_type is not None: | |
zinfo.compress_type = compress_type | |
self._writecheck(zinfo) | |
self._didModify = True | |
zinfo.CRC = crc32(data) & 0xffffffff # CRC-32 checksum | |
if zinfo.compress_type == ZIP_DEFLATED: | |
co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, | |
zlib.DEFLATED, -15) | |
data = co.compress(data) + co.flush() | |
zinfo.compress_size = len(data) # Compressed size | |
else: | |
zinfo.compress_size = zinfo.file_size | |
zinfo.header_offset = self.fp.tell() # Start of header data | |
self.fp.write(zinfo.FileHeader()) | |
self.fp.write(data) | |
if zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR: | |
# Write CRC and file sizes after the file data | |
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, | |
zinfo.file_size)) | |
self.fp.flush() | |
self.filelist.append(zinfo) | |
self.NameToInfo[zinfo.filename] = zinfo | |
def _get_data_descriptor_size(self, zinfo): | |
if self.mode not in ("r", "a"): | |
raise RuntimeError('to read the data descriptor requires mode "r" or "a"') | |
if not zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR: | |
return 0 | |
original_fp = self.fp.tell() | |
data_descriptor_fp = zinfo.header_offset + len(zinfo.FileHeader()) + zinfo.compress_size | |
self.fp.seek(data_descriptor_fp) | |
sig_or_not_bin = self.fp.read(4) | |
sig_or_not = struct.unpack('<L', sig_or_not_bin) | |
self.fp.seek(original_fp) | |
# The data descriptor can either have the signature or not, yet | |
# the standards don't specify the signature as an illegal CRC. | |
if sig_or_not == dataDescriptorSignature: | |
return 16 | |
else: | |
return 12 | |
def remove(self, member): | |
"""Remove a file from the archive. Only works if the ZipFile was opened | |
with mode 'a'.""" | |
if "a" not in self.mode: | |
raise RuntimeError('remove() requires mode "a"') | |
if not self.fp: | |
raise RuntimeError( | |
"Attempt to modify ZIP archive that was already closed") | |
fp = self.fp | |
# Make sure we have an info object | |
if isinstance(member, ZipInfo): | |
# 'member' is already an info object | |
zinfo = member | |
else: | |
# Get info object for member | |
zinfo = self.getinfo(member) | |
# start at the pos of the first member (smallest offset) | |
position = min([info.header_offset for info in self.filelist]) # start at the beginning of first file | |
for info in self.filelist: | |
fileheader = info.FileHeader() | |
# is member after delete one? | |
if info.header_offset > zinfo.header_offset and info != zinfo: | |
# rewrite FileHeader and copy compressed data | |
# Skip the file header: | |
fp.seek(info.header_offset) | |
fheader = fp.read(sizeFileHeader) | |
if fheader[0:4] != stringFileHeader: | |
raise BadZipFile("Bad magic number for file header") | |
fheader = struct.unpack(structFileHeader, fheader) | |
fname = fp.read(fheader[_FH_FILENAME_LENGTH]) | |
if fheader[_FH_EXTRA_FIELD_LENGTH]: | |
fp.read(fheader[_FH_EXTRA_FIELD_LENGTH]) | |
if zinfo.flag_bits & 0x800: | |
# UTF-8 filename | |
fname_str = fname.decode("utf-8") | |
else: | |
fname_str = fname.decode("cp437") | |
if fname_str != info.orig_filename: | |
if not self._filePassed: | |
fp.close() | |
raise BadZipFile( | |
'File name in directory %r and header %r differ.' | |
% (zinfo.orig_filename, fname)) | |
# read the actual data | |
data = fp.read(fheader[_FH_COMPRESSED_SIZE]) | |
# modify info obj | |
info.header_offset = position | |
# jump to new position | |
fp.seek(info.header_offset, 0) | |
# write fileheader and data | |
fp.write(fileheader) | |
fp.write(data) | |
if zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR: | |
# Write CRC and file sizes after the file data | |
fp.write(struct.pack("<LLL", info.CRC, info.compress_size, | |
info.file_size)) | |
# update position | |
fp.flush() | |
position = fp.tell() | |
elif info != zinfo: | |
# move to next position | |
position = position + info.compress_size + len(fileheader) + self._get_data_descriptor_size(info) | |
# Fix class members with state | |
self.start_dir = position | |
self._didModify = True | |
self.filelist.remove(zinfo) | |
del self.NameToInfo[zinfo.filename] | |
# write new central directory (includes truncate) | |
fp.seek(position, 0) | |
self._write_central_dir() | |
fp.seek(self.start_dir, 0) # jump to the beginning of the central directory, so it gets overridden at close() | |
def __del__(self): | |
"""Call the "close()" method in case the user forgot.""" | |
self.close() | |
def _central_dir_header(self, zinfo): | |
dt = zinfo.date_time | |
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] | |
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) | |
extra = [] | |
if zinfo.file_size > ZIP64_LIMIT \ | |
or zinfo.compress_size > ZIP64_LIMIT: | |
extra.append(zinfo.file_size) | |
extra.append(zinfo.compress_size) | |
file_size = 0xffffffff | |
compress_size = 0xffffffff | |
else: | |
file_size = zinfo.file_size | |
compress_size = zinfo.compress_size | |
if zinfo.header_offset > ZIP64_LIMIT: | |
extra.append(zinfo.header_offset) | |
header_offset = 0xffffffff | |
else: | |
header_offset = zinfo.header_offset | |
extra_data = zinfo.extra | |
if extra: | |
# Append a ZIP64 field to the extra's | |
extra_data = struct.pack( | |
'<HH' + 'Q'*len(extra), | |
1, 8*len(extra), *extra) + extra_data | |
extract_version = max(45, zinfo.extract_version) | |
create_version = max(45, zinfo.create_version) | |
else: | |
extract_version = zinfo.extract_version | |
create_version = zinfo.create_version | |
try: | |
filename, flag_bits = zinfo._encodeFilenameFlags() | |
centdir = struct.pack(structCentralDir, | |
stringCentralDir, create_version, | |
zinfo.create_system, extract_version, zinfo.reserved, | |
flag_bits, zinfo.compress_type, dostime, dosdate, | |
zinfo.CRC, compress_size, file_size, | |
len(filename), len(extra_data), len(zinfo.comment), | |
0, zinfo.internal_attr, zinfo.external_attr, | |
header_offset) | |
except DeprecationWarning: | |
print((structCentralDir, stringCentralDir, create_version, | |
zinfo.create_system, extract_version, zinfo.reserved, | |
zinfo.flag_bits, zinfo.compress_type, dostime, dosdate, | |
zinfo.CRC, compress_size, file_size, | |
len(zinfo.filename), len(extra_data), len(zinfo.comment), | |
0, zinfo.internal_attr, zinfo.external_attr, | |
header_offset), file=sys.stderr) | |
raise | |
return centdir + filename + extra_data + zinfo.comment | |
def _write_central_dir(self): | |
if self.fp is None: | |
return | |
if self.mode in ("w", "a"): # write ending records | |
count = 0 | |
pos1 = self.fp.tell() | |
for zinfo in self.filelist: # write central directory | |
count = count + 1 | |
self.fp.write(self._central_dir_header(zinfo)) | |
pos2 = self.fp.tell() | |
# Write end-of-zip-archive record | |
centDirCount = count | |
centDirSize = pos2 - pos1 | |
centDirOffset = pos1 | |
if (centDirCount >= ZIP_FILECOUNT_LIMIT or | |
centDirOffset > ZIP64_LIMIT or | |
centDirSize > ZIP64_LIMIT): | |
# Need to write the ZIP64 end-of-archive records | |
zip64endrec = struct.pack( | |
structEndArchive64, stringEndArchive64, | |
44, 45, 45, 0, 0, centDirCount, centDirCount, | |
centDirSize, centDirOffset) | |
self.fp.write(zip64endrec) | |
zip64locrec = struct.pack( | |
structEndArchive64Locator, | |
stringEndArchive64Locator, 0, pos2, 1) | |
self.fp.write(zip64locrec) | |
centDirCount = min(centDirCount, 0xFFFF) | |
centDirSize = min(centDirSize, 0xFFFFFFFF) | |
centDirOffset = min(centDirOffset, 0xFFFFFFFF) | |
# check for valid comment length | |
if len(self.comment) >= ZIP_MAX_COMMENT: | |
if self.debug > 0: | |
msg = 'Archive comment is too long; truncating to %d bytes' \ | |
% ZIP_MAX_COMMENT | |
self.comment = self.comment[:ZIP_MAX_COMMENT] | |
endrec = struct.pack(structEndArchive, stringEndArchive, | |
0, 0, centDirCount, centDirCount, | |
centDirSize, centDirOffset, len(self.comment)) | |
self.fp.write(endrec) | |
self.fp.write(self.comment) | |
self.fp.flush() | |
self.fp.truncate() | |
def close(self): | |
"""Close the file, and for mode "w" and "a" write the ending | |
records.""" | |
if self.fp is None: | |
return | |
if self.mode in ("w", "a") and self._didModify: # write ending records | |
self._write_central_dir() | |
if not self._filePassed: | |
self.fp.close() | |
self.fp = None | |
class PyZipFile(ZipFile): | |
"""Class to create ZIP archives with Python library files and packages.""" | |
def __init__(self, file, mode="r", compression=ZIP_STORED, | |
allowZip64=False, optimize=-1): | |
ZipFile.__init__(self, file, mode=mode, compression=compression, | |
allowZip64=allowZip64) | |
self._optimize = optimize | |
def writepy(self, pathname, basename=""): | |
"""Add all files from "pathname" to the ZIP archive. | |
If pathname is a package directory, search the directory and | |
all package subdirectories recursively for all *.py and enter | |
the modules into the archive. If pathname is a plain | |
directory, listdir *.py and enter all modules. Else, pathname | |
must be a Python *.py file and the module will be put into the | |
archive. Added modules are always module.pyo or module.pyc. | |
This method will compile the module.py into module.pyc if | |
necessary. | |
""" | |
dir, name = os.path.split(pathname) | |
if os.path.isdir(pathname): | |
initname = os.path.join(pathname, "__init__.py") | |
if os.path.isfile(initname): | |
# This is a package directory, add it | |
if basename: | |
basename = "%s/%s" % (basename, name) | |
else: | |
basename = name | |
if self.debug: | |
print(u"Adding package in", pathname, "as", basename) | |
fname, arcname = self._get_codename(initname[0:-3], basename) | |
if self.debug: | |
print(u"Adding", arcname) | |
self.write(fname, arcname) | |
dirlist = os.listdir(pathname) | |
dirlist.remove("__init__.py") | |
# Add all *.py files and package subdirectories | |
for filename in dirlist: | |
path = os.path.join(pathname, filename) | |
root, ext = os.path.splitext(filename) | |
if os.path.isdir(path): | |
if os.path.isfile(os.path.join(path, "__init__.py")): | |
# This is a package directory, add it | |
self.writepy(path, basename) # Recursive call | |
elif ext == ".py": | |
fname, arcname = self._get_codename(path[0:-3], | |
basename) | |
if self.debug: | |
print(u"Adding", arcname) | |
self.write(fname, arcname) | |
else: | |
# This is NOT a package directory, add its files at top level | |
if self.debug: | |
print("Adding files from directory", pathname) | |
for filename in os.listdir(pathname): | |
path = os.path.join(pathname, filename) | |
root, ext = os.path.splitext(filename) | |
if ext == ".py": | |
fname, arcname = self._get_codename(path[0:-3], | |
basename) | |
if self.debug: | |
print(u"Adding", arcname) | |
self.write(fname, arcname) | |
else: | |
if pathname[-3:] != ".py": | |
raise RuntimeError( | |
'Files added with writepy() must end with ".py"') | |
fname, arcname = self._get_codename(pathname[0:-3], basename) | |
if self.debug: | |
print(u"Adding file", arcname) | |
self.write(fname, arcname) | |
def _get_codename(self, pathname, basename): | |
"""Return (filename, archivename) for the path. | |
Given a module name path, return the correct file path and | |
archive name, compiling if necessary. For example, given | |
/python/lib/string, return (/python/lib/string.pyc, string). | |
""" | |
def _compile(file, optimize=-1): | |
import py_compile | |
if self.debug: | |
print(u"Compiling", file) | |
try: | |
if sys.version_info[0] >= 3 and sys.version_info[1] >= 2: | |
py_compile.compile(file, doraise=True, optimize=optimize) | |
else: | |
py_compile.compile(file, doraise=True) | |
except py_compile.PyCompileError as error: | |
print(error.msg) | |
return False | |
return True | |
file_py = pathname + ".py" | |
file_pyc = pathname + ".pyc" | |
file_pyo = pathname + ".pyo" | |
if hasattr(imp, 'cache_from_source'): | |
pycache_pyc = imp.cache_from_source(file_py, True) | |
pycache_pyo = imp.cache_from_source(file_py, False) | |
else: | |
pycache_pyc = file_pyc | |
pycache_pyo = file_pyo | |
if self._optimize == -1: | |
# legacy mode: use whatever file is present | |
if (os.path.isfile(file_pyo) and | |
os.stat(file_pyo).st_mtime >= os.stat(file_py).st_mtime): | |
# Use .pyo file. | |
arcname = fname = file_pyo | |
elif (os.path.isfile(file_pyc) and | |
os.stat(file_pyc).st_mtime >= os.stat(file_py).st_mtime): | |
# Use .pyc file. | |
arcname = fname = file_pyc | |
elif (os.path.isfile(pycache_pyc) and | |
os.stat(pycache_pyc).st_mtime >= os.stat(file_py).st_mtime): | |
# Use the __pycache__/*.pyc file, but write it to the legacy pyc | |
# file name in the archive. | |
fname = pycache_pyc | |
arcname = file_pyc | |
elif (os.path.isfile(pycache_pyo) and | |
os.stat(pycache_pyo).st_mtime >= os.stat(file_py).st_mtime): | |
# Use the __pycache__/*.pyo file, but write it to the legacy pyo | |
# file name in the archive. | |
fname = pycache_pyo | |
arcname = file_pyo | |
else: | |
# Compile py into PEP 3147 pyc file. | |
if _compile(file_py): | |
fname = (pycache_pyc if __debug__ else pycache_pyo) | |
arcname = (file_pyc if __debug__ else file_pyo) | |
else: | |
fname = arcname = file_py | |
else: | |
# new mode: use given optimization level | |
if self._optimize == 0: | |
fname = pycache_pyc | |
arcname = file_pyc | |
else: | |
fname = pycache_pyo | |
arcname = file_pyo | |
if not (os.path.isfile(fname) and | |
os.stat(fname).st_mtime >= os.stat(file_py).st_mtime): | |
if not _compile(file_py, optimize=self._optimize): | |
fname = arcname = file_py | |
archivename = os.path.split(arcname)[1] | |
if basename: | |
archivename = "%s/%s" % (basename, archivename) | |
return (fname, archivename) | |
def main(args = None): | |
import textwrap | |
USAGE=textwrap.dedent("""\ | |
Usage: | |
zipfile.py -l zipfile.zip # Show listing of a zipfile | |
zipfile.py -t zipfile.zip # Test if a zipfile is valid | |
zipfile.py -e zipfile.zip target # Extract zipfile into target dir | |
zipfile.py -c zipfile.zip src ... # Create zipfile from sources | |
""") | |
if args is None: | |
args = sys.argv[1:] | |
if not args or args[0] not in ('-l', '-c', '-e', '-t'): | |
print(USAGE) | |
sys.exit(1) | |
if args[0] == '-l': | |
if len(args) != 2: | |
print(USAGE) | |
sys.exit(1) | |
zf = ZipFile(args[1], 'r') | |
zf.printdir() | |
zf.close() | |
elif args[0] == '-t': | |
if len(args) != 2: | |
print(USAGE) | |
sys.exit(1) | |
zf = ZipFile(args[1], 'r') | |
badfile = zf.testzip() | |
if badfile: | |
print(u"The following enclosed file is corrupted: {!r}".format(badfile)) | |
print(u"Done testing") | |
elif args[0] == '-e': | |
if len(args) != 3: | |
print(USAGE) | |
sys.exit(1) | |
zf = ZipFile(args[1], 'r') | |
out = args[2] | |
for path in zf.namelist(): | |
if path.startswith('./'): | |
tgt = os.path.join(out, path[2:]) | |
else: | |
tgt = os.path.join(out, path) | |
tgtdir = os.path.dirname(tgt) | |
if not os.path.exists(tgtdir): | |
os.makedirs(tgtdir) | |
with open(tgt, 'wb') as fp: | |
fp.write(zf.read(path)) | |
zf.close() | |
elif args[0] == '-c': | |
if len(args) < 3: | |
print(USAGE) | |
sys.exit(1) | |
def addToZip(zf, path, zippath): | |
if os.path.isfile(path): | |
zf.write(path, zippath, ZIP_DEFLATED) | |
elif os.path.isdir(path): | |
for nm in os.listdir(path): | |
addToZip(zf, | |
os.path.join(path, nm), os.path.join(zippath, nm)) | |
# else: ignore | |
zf = ZipFile(args[1], 'w', allowZip64=True) | |
for src in args[2:]: | |
addToZip(zf, src, os.path.basename(src)) | |
zf.close() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
modified version of the original Python zipfile unittest | |
Copyright (c) 2001-2016 Python Software Foundation; All Rights Reserved | |
""" | |
# 1. This LICENSE AGREEMENT is between the Python Software Foundation ("PSF"), and | |
# the Individual or Organization ("Licensee") accessing and otherwise using Python | |
# 2.7.11 software in source or binary form and its associated documentation. | |
# | |
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby | |
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce, | |
# analyze, test, perform and/or display publicly, prepare derivative works, | |
# distribute, and otherwise use Python 2.7.11 alone or in any derivative | |
# version, provided, however, that PSF's License Agreement and PSF's notice of | |
# copyright, i.e., "Copyright (c) 2001-2016 Python Software Foundation; All Rights | |
# Reserved" are retained in Python 2.7.11 alone or in any derivative version | |
# prepared by Licensee. | |
# | |
# 3. In the event Licensee prepares a derivative work that is based on or | |
# incorporates Python 2.7.11 or any part thereof, and wants to make the | |
# derivative work available to others as provided herein, then Licensee hereby | |
# agrees to include in any such work a brief summary of the changes made to Python | |
# 2.7.11. | |
# | |
# 4. PSF is making Python 2.7.11 available to Licensee on an "AS IS" basis. | |
# PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF | |
# EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR | |
# WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE | |
# USE OF PYTHON 2.7.11 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS. | |
# | |
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.11 | |
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF | |
# MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.11, OR ANY DERIVATIVE | |
# THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. | |
# | |
# 6. This License Agreement will automatically terminate upon a material breach of | |
# its terms and conditions. | |
# | |
# 7. Nothing in this License Agreement shall be deemed to create any relationship | |
# of agency, partnership, or joint venture between PSF and Licensee. This License | |
# Agreement does not grant permission to use PSF trademarks or trade name in a | |
# trademark sense to endorse or promote products or services of Licensee, or any | |
# third party. | |
# | |
# 8. By copying, installing or otherwise using Python 2.7.11, Licensee agrees | |
# to be bound by the terms and conditions of this License Agreement. | |
# We can test part of the module without zlib. | |
try: | |
import zlib | |
except ImportError: | |
zlib = None | |
import io | |
import os | |
import sys | |
import imp | |
import time | |
import shutil | |
import struct | |
import unittest | |
import combinearchive.custom_zip as zipfile | |
from tempfile import TemporaryFile | |
from random import randint, random | |
from unittest import skipUnless | |
from tests.custom_support import TESTFN, run_unittest, findfile, unlink | |
TESTFN2 = TESTFN + "2" | |
TESTFNDIR = TESTFN + "d" | |
FIXEDTEST_SIZE = 1000 | |
DATAFILES_DIR = 'zipfile_datafiles' | |
SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), | |
('ziptest2dir/_ziptest2', 'qawsedrftg'), | |
('/ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), | |
('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] | |
class TestsWithSourceFile(unittest.TestCase): | |
def setUp(self): | |
self.line_gen = (bytes("Zipfile test line %d. random float: %f" % | |
(i, random())).encode("ascii") | |
for i in range(FIXEDTEST_SIZE)) | |
self.data = b'\n'.join(self.line_gen) + b'\n' | |
# Make a source file with some lines | |
with open(TESTFN, "wb") as fp: | |
fp.write(self.data) | |
def make_test_archive(self, f, compression): | |
# Create the ZIP archive | |
with zipfile.ZipFile(f, "w", compression) as zipfp: | |
zipfp.write(TESTFN, "another.name") | |
zipfp.write(TESTFN, TESTFN) | |
zipfp.writestr("strfile", self.data) | |
def zip_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r", compression) as zipfp: | |
self.assertEqual(zipfp.read(TESTFN), self.data) | |
self.assertEqual(zipfp.read("another.name"), self.data) | |
self.assertEqual(zipfp.read("strfile"), self.data) | |
# Print the ZIP directory | |
fp = io.StringIO() | |
zipfp.printdir(file=fp) | |
directory = fp.getvalue() | |
lines = directory.splitlines() | |
self.assertEqual(len(lines), 4) # Number of files + header | |
self.assertIn('File Name', lines[0]) | |
self.assertIn('Modified', lines[0]) | |
self.assertIn('Size', lines[0]) | |
fn, date, time_, size = lines[1].split() | |
self.assertEqual(fn, 'another.name') | |
self.assertTrue(time.strptime(date, '%Y-%m-%d')) | |
self.assertTrue(time.strptime(time_, '%H:%M:%S')) | |
self.assertEqual(size, str(len(self.data))) | |
# Check the namelist | |
names = zipfp.namelist() | |
self.assertEqual(len(names), 3) | |
self.assertIn(TESTFN, names) | |
self.assertIn("another.name", names) | |
self.assertIn("strfile", names) | |
# Check infolist | |
infos = zipfp.infolist() | |
names = [i.filename for i in infos] | |
self.assertEqual(len(names), 3) | |
self.assertIn(TESTFN, names) | |
self.assertIn("another.name", names) | |
self.assertIn("strfile", names) | |
for i in infos: | |
self.assertEqual(i.file_size, len(self.data)) | |
# check getinfo | |
for nm in (TESTFN, "another.name", "strfile"): | |
info = zipfp.getinfo(nm) | |
self.assertEqual(info.filename, nm) | |
self.assertEqual(info.file_size, len(self.data)) | |
# Check that testzip doesn't raise an exception | |
zipfp.testzip() | |
if not isinstance(f, str): | |
f.close() | |
def test_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_test(f, zipfile.ZIP_STORED) | |
def zip_open_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r", compression) as zipfp: | |
zipdata1 = [] | |
with zipfp.open(TESTFN) as zipopen1: | |
while True: | |
read_data = zipopen1.read(256) | |
if not read_data: | |
break | |
zipdata1.append(read_data) | |
zipdata2 = [] | |
with zipfp.open("another.name") as zipopen2: | |
while True: | |
read_data = zipopen2.read(256) | |
if not read_data: | |
break | |
zipdata2.append(read_data) | |
self.assertEqual(b''.join(zipdata1), self.data) | |
self.assertEqual(b''.join(zipdata2), self.data) | |
if not isinstance(f, str): | |
f.close() | |
def test_open_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_open_test(f, zipfile.ZIP_STORED) | |
def test_open_via_zip_info(self): | |
# Create the ZIP archive | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: | |
zipfp.writestr("name", "foo") | |
zipfp.writestr("name", "bar") | |
with zipfile.ZipFile(TESTFN2, "r") as zipfp: | |
infos = zipfp.infolist() | |
data = b"" | |
for info in infos: | |
with zipfp.open(info) as zipopen: | |
data += zipopen.read() | |
self.assertTrue(data == b"foobar" or data == b"barfoo") | |
data = b"" | |
for info in infos: | |
data += zipfp.read(info) | |
self.assertTrue(data == b"foobar" or data == b"barfoo") | |
def zip_random_open_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r", compression) as zipfp: | |
zipdata1 = [] | |
with zipfp.open(TESTFN) as zipopen1: | |
while True: | |
read_data = zipopen1.read(randint(1, 1024)) | |
if not read_data: | |
break | |
zipdata1.append(read_data) | |
self.assertEqual(b''.join(zipdata1), self.data) | |
if not isinstance(f, str): | |
f.close() | |
def test_random_open_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_random_open_test(f, zipfile.ZIP_STORED) | |
def test_univeral_readaheads(self): | |
f = io.BytesIO() | |
data = b'a\r\n' * 16 * 1024 | |
zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) | |
zipfp.writestr(TESTFN, data) | |
zipfp.close() | |
data2 = b'' | |
zipfp = zipfile.ZipFile(f, 'r') | |
with zipfp.open(TESTFN, 'rU') as zipopen: | |
for line in zipopen: | |
data2 += line | |
zipfp.close() | |
self.assertEqual(data, data2.replace(b'\n', b'\r\n')) | |
def zip_readline_read_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
zipfp = zipfile.ZipFile(f, "r") | |
with zipfp.open(TESTFN) as zipopen: | |
data = b'' | |
while True: | |
read = zipopen.readline() | |
if not read: | |
break | |
data += read | |
read = zipopen.read(100) | |
if not read: | |
break | |
data += read | |
self.assertEqual(data, self.data) | |
zipfp.close() | |
if not isinstance(f, str): | |
f.close() | |
def zip_readline_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
with zipfp.open(TESTFN) as zipopen: | |
for line in self.line_gen: | |
linedata = zipopen.readline() | |
self.assertEqual(linedata, line + '\n') | |
if not isinstance(f, str): | |
f.close() | |
def zip_readlines_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
with zipfp.open(TESTFN) as zipopen: | |
ziplines = zipopen.readlines() | |
for line, zipline in zip(self.line_gen, ziplines): | |
self.assertEqual(zipline, line + '\n') | |
if not isinstance(f, str): | |
f.close() | |
def zip_iterlines_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
with zipfp.open(TESTFN) as zipopen: | |
for line, zipline in zip(self.line_gen, zipopen): | |
self.assertEqual(zipline, line + '\n') | |
if not isinstance(f, str): | |
f.close() | |
def test_readline_read_stored(self): | |
# Issue #7610: calls to readline() interleaved with calls to read(). | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_readline_read_test(f, zipfile.ZIP_STORED) | |
def test_readline_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_readline_test(f, zipfile.ZIP_STORED) | |
def test_readlines_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_readlines_test(f, zipfile.ZIP_STORED) | |
def test_iterlines_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_iterlines_test(f, zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_open_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_open_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_random_open_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_readline_read_deflated(self): | |
# Issue #7610: calls to readline() interleaved with calls to read(). | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_readline_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_readline_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_readlines_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_readlines_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_iterlines_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_low_compression(self): | |
"""Check for cases where compressed data is larger than original.""" | |
# Create the ZIP archive | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: | |
zipfp.writestr("strfile", '12') | |
# Get an open object for strfile | |
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: | |
with zipfp.open("strfile") as openobj: | |
self.assertEqual(openobj.read(1), b'1') | |
self.assertEqual(openobj.read(1), b'2') | |
def test_absolute_arcnames(self): | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: | |
zipfp.write(TESTFN, "/absolute") | |
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: | |
self.assertEqual(zipfp.namelist(), ["absolute"]) | |
def test_append_to_zip_file(self): | |
"""Test appending to an existing zipfile.""" | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: | |
zipfp.write(TESTFN, TESTFN) | |
with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: | |
zipfp.writestr("strfile", self.data) | |
self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) | |
def test_append_to_non_zip_file(self): | |
"""Test appending to an existing file that is not a zipfile.""" | |
# NOTE: this test fails if len(d) < 22 because of the first | |
# line "fpin.seek(-22, 2)" in _EndRecData | |
data = b'I am not a ZipFile!' * 10 | |
with open(TESTFN2, 'wb') as f: | |
f.write(data) | |
with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: | |
zipfp.write(TESTFN, TESTFN) | |
with open(TESTFN2, 'rb') as f: | |
f.seek(len(data)) | |
with zipfile.ZipFile(f, "r") as zipfp: | |
self.assertEqual(zipfp.namelist(), [TESTFN]) | |
def test_write_default_name(self): | |
"""Check that calling ZipFile.write without arcname specified | |
produces the expected result.""" | |
with zipfile.ZipFile(TESTFN2, "w") as zipfp: | |
zipfp.write(TESTFN) | |
with open(TESTFN, "rb") as f: | |
self.assertEqual(zipfp.read(TESTFN), f.read()) | |
@skipUnless(zlib, "requires zlib") | |
def test_per_file_compression(self): | |
"""Check that files within a Zip archive can have different | |
compression options.""" | |
with zipfile.ZipFile(TESTFN2, "w") as zipfp: | |
zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) | |
zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) | |
sinfo = zipfp.getinfo('storeme') | |
dinfo = zipfp.getinfo('deflateme') | |
self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) | |
self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) | |
def test_write_to_readonly(self): | |
"""Check that trying to call write() on a readonly ZipFile object | |
raises a RuntimeError.""" | |
with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: | |
zipfp.writestr("somefile.txt", "bogus") | |
with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: | |
self.assertRaises(RuntimeError, zipfp.write, TESTFN) | |
def test_extract(self): | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: | |
for fpath, fdata in SMALL_TEST_DATA: | |
zipfp.writestr(fpath, fdata) | |
with zipfile.ZipFile(TESTFN2, "r") as zipfp: | |
for fpath, fdata in SMALL_TEST_DATA: | |
writtenfile = zipfp.extract(fpath) | |
# make sure it was written to the right place | |
if os.path.isabs(fpath): | |
correctfile = os.path.join(os.getcwd(), fpath[1:]) | |
else: | |
correctfile = os.path.join(os.getcwd(), fpath) | |
correctfile = os.path.normpath(correctfile) | |
self.assertEqual(writtenfile, correctfile) | |
# make sure correct data is in correct file | |
with open(writtenfile, "rb") as f: | |
self.assertEqual(fdata.encode(), f.read()) | |
os.remove(writtenfile) | |
# remove the test file subdirectories | |
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) | |
def test_extract_all(self): | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: | |
for fpath, fdata in SMALL_TEST_DATA: | |
zipfp.writestr(fpath, fdata) | |
with zipfile.ZipFile(TESTFN2, "r") as zipfp: | |
zipfp.extractall() | |
for fpath, fdata in SMALL_TEST_DATA: | |
if os.path.isabs(fpath): | |
outfile = os.path.join(os.getcwd(), fpath[1:]) | |
else: | |
outfile = os.path.join(os.getcwd(), fpath) | |
with open(outfile, "rb") as f: | |
self.assertEqual(fdata.encode(), f.read()) | |
os.remove(outfile) | |
# remove the test file subdirectories | |
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) | |
def test_writestr_compression(self): | |
zipfp = zipfile.ZipFile(TESTFN2, "w") | |
zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) | |
if zlib: | |
zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) | |
info = zipfp.getinfo('a.txt') | |
self.assertEqual(info.compress_type, zipfile.ZIP_STORED) | |
if zlib: | |
info = zipfp.getinfo('b.txt') | |
self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) | |
def zip_test_writestr_permissions(self, f, compression): | |
# Make sure that writestr creates files with mode 0600, | |
# when it is passed a name rather than a ZipInfo instance. | |
self.make_test_archive(f, compression) | |
with zipfile.ZipFile(f, "r") as zipfp: | |
zinfo = zipfp.getinfo('strfile') | |
self.assertEqual(zinfo.external_attr, 0o600 << 16) | |
if not isinstance(f, str): | |
f.close() | |
def test_writestr_permissions(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) | |
def test_writestr_extended_local_header_issue1202(self): | |
with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: | |
for data in 'abcdefghijklmnop': | |
zinfo = zipfile.ZipInfo(data) | |
zinfo.flag_bits |= 0x08 # Include an extended local header. | |
orig_zip.writestr(zinfo, data) | |
def test_close(self): | |
"""Check that the zipfile is closed after the 'with' block.""" | |
with zipfile.ZipFile(TESTFN2, "w") as zipfp: | |
for fpath, fdata in SMALL_TEST_DATA: | |
zipfp.writestr(fpath, fdata) | |
self.assertTrue(zipfp.fp is not None, 'zipfp is not open') | |
self.assertTrue(zipfp.fp is None, 'zipfp is not closed') | |
with zipfile.ZipFile(TESTFN2, "r") as zipfp: | |
self.assertTrue(zipfp.fp is not None, 'zipfp is not open') | |
self.assertTrue(zipfp.fp is None, 'zipfp is not closed') | |
def test_close_on_exception(self): | |
"""Check that the zipfile is closed if an exception is raised in the | |
'with' block.""" | |
with zipfile.ZipFile(TESTFN2, "w") as zipfp: | |
for fpath, fdata in SMALL_TEST_DATA: | |
zipfp.writestr(fpath, fdata) | |
try: | |
with zipfile.ZipFile(TESTFN2, "r") as zipfp2: | |
raise zipfile.BadZipFile() | |
except zipfile.BadZipFile: | |
self.assertTrue(zipfp2.fp is None, 'zipfp is not closed') | |
def test_unicode_filenames(self): | |
# bug #10801 | |
fname = findfile('zip_cp437_header.zip') | |
with zipfile.ZipFile(fname) as zipfp: | |
for name in zipfp.namelist(): | |
zipfp.open(name).close() | |
def tearDown(self): | |
unlink(TESTFN) | |
unlink(TESTFN2) | |
class TestZip64InSmallFiles(unittest.TestCase): | |
# These tests test the ZIP64 functionality without using large files, | |
# see test_zipfile64 for proper tests. | |
def setUp(self): | |
line_gen = (bytes("Test of zipfile line %d." % i).encode("ascii") | |
for i in range(0, FIXEDTEST_SIZE)) | |
self.data = b'\n'.join(line_gen) | |
# Make a source file with some lines | |
with open(TESTFN, "wb") as fp: | |
fp.write(self.data) | |
# modify ZIP64_LIMIT last, so if write fails it won't affect the other tests | |
self._limit = zipfile.ZIP64_LIMIT | |
zipfile.ZIP64_LIMIT = 5 | |
def large_file_exception_test(self, f, compression): | |
with zipfile.ZipFile(f, "w", compression) as zipfp: | |
self.assertRaises(zipfile.LargeZipFile, | |
zipfp.write, TESTFN, "another.name") | |
def large_file_exception_test2(self, f, compression): | |
with zipfile.ZipFile(f, "w", compression) as zipfp: | |
self.assertRaises(zipfile.LargeZipFile, | |
zipfp.writestr, "another.name", self.data) | |
if not isinstance(f, str): | |
f.close() | |
def test_large_file_exception(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.large_file_exception_test(f, zipfile.ZIP_STORED) | |
self.large_file_exception_test2(f, zipfile.ZIP_STORED) | |
def zip_test(self, f, compression): | |
# Create the ZIP archive | |
with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: | |
zipfp.write(TESTFN, "another.name") | |
zipfp.write(TESTFN, TESTFN) | |
zipfp.writestr("strfile", self.data) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r", compression) as zipfp: | |
self.assertEqual(zipfp.read(TESTFN), self.data) | |
self.assertEqual(zipfp.read("another.name"), self.data) | |
self.assertEqual(zipfp.read("strfile"), self.data) | |
# Print the ZIP directory | |
fp = io.StringIO() | |
zipfp.printdir(fp) | |
directory = fp.getvalue() | |
lines = directory.splitlines() | |
self.assertEqual(len(lines), 4) # Number of files + header | |
self.assertIn('File Name', lines[0]) | |
self.assertIn('Modified', lines[0]) | |
self.assertIn('Size', lines[0]) | |
fn, date, time_, size = lines[1].split() | |
self.assertEqual(fn, 'another.name') | |
self.assertTrue(time.strptime(date, '%Y-%m-%d')) | |
self.assertTrue(time.strptime(time_, '%H:%M:%S')) | |
self.assertEqual(size, str(len(self.data))) | |
# Check the namelist | |
names = zipfp.namelist() | |
self.assertEqual(len(names), 3) | |
self.assertIn(TESTFN, names) | |
self.assertIn("another.name", names) | |
self.assertIn("strfile", names) | |
# Check infolist | |
infos = zipfp.infolist() | |
names = [i.filename for i in infos] | |
self.assertEqual(len(names), 3) | |
self.assertIn(TESTFN, names) | |
self.assertIn("another.name", names) | |
self.assertIn("strfile", names) | |
for i in infos: | |
self.assertEqual(i.file_size, len(self.data)) | |
# check getinfo | |
for nm in (TESTFN, "another.name", "strfile"): | |
info = zipfp.getinfo(nm) | |
self.assertEqual(info.filename, nm) | |
self.assertEqual(info.file_size, len(self.data)) | |
# Check that testzip doesn't raise an exception | |
zipfp.testzip() | |
if not isinstance(f, str): | |
f.close() | |
def test_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_test(f, zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_test(f, zipfile.ZIP_DEFLATED) | |
def test_absolute_arcnames(self): | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, | |
allowZip64=True) as zipfp: | |
zipfp.write(TESTFN, "/absolute") | |
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: | |
self.assertEqual(zipfp.namelist(), ["absolute"]) | |
def tearDown(self): | |
zipfile.ZIP64_LIMIT = self._limit | |
unlink(TESTFN) | |
unlink(TESTFN2) | |
class PyZipFileTests(unittest.TestCase): | |
def test_write_pyfile(self): | |
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: | |
fn = __file__ | |
if fn.endswith('.pyc') or fn.endswith('.pyo'): | |
path_split = fn.split(os.sep) | |
if os.altsep is not None: | |
path_split.extend(fn.split(os.altsep)) | |
if '__pycache__' in path_split: | |
fn = imp.source_from_cache(fn) | |
else: | |
fn = fn[:-1] | |
zipfp.writepy(fn) | |
bn = os.path.basename(fn) | |
self.assertNotIn(bn, zipfp.namelist()) | |
self.assertTrue(bn + 'o' in zipfp.namelist() or | |
bn + 'c' in zipfp.namelist()) | |
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: | |
fn = __file__ | |
if fn.endswith(('.pyc', '.pyo')): | |
fn = fn[:-1] | |
zipfp.writepy(fn, "testpackage") | |
bn = "%s/%s" % ("testpackage", os.path.basename(fn)) | |
self.assertNotIn(bn, zipfp.namelist()) | |
self.assertTrue(bn + 'o' in zipfp.namelist() or | |
bn + 'c' in zipfp.namelist()) | |
def test_write_python_package(self): | |
import email | |
packagedir = os.path.dirname(email.__file__) | |
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: | |
zipfp.writepy(packagedir) | |
# Check for a couple of modules at different levels of the | |
# hierarchy | |
names = zipfp.namelist() | |
self.assertTrue('email/__init__.pyo' in names or | |
'email/__init__.pyc' in names) | |
self.assertTrue('email/mime/text.pyo' in names or | |
'email/mime/text.pyc' in names) | |
def test_write_with_optimization(self): | |
import email | |
packagedir = os.path.dirname(email.__file__) | |
# use .pyc if running test in optimization mode, | |
# use .pyo if running test in debug mode | |
optlevel = 1 if __debug__ else 0 | |
ext = '.pyo' if optlevel == 1 else '.pyc' | |
try: | |
with TemporaryFile() as t, \ | |
zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: | |
zipfp.writepy(packagedir) | |
names = zipfp.namelist() | |
self.assertIn('email/__init__' + ext, names) | |
self.assertIn('email/mime/text' + ext, names) | |
except IOError: | |
print "\n Cannot execute test_write_with_optimization due to insufficient read permissions for %s" \ | |
% packagedir | |
def test_write_python_directory(self): | |
os.mkdir(TESTFN2) | |
try: | |
with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: | |
fp.write("print(42)\n") | |
with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: | |
fp.write("print(42 * 42)\n") | |
with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: | |
fp.write("bla bla bla\n") | |
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: | |
zipfp.writepy(TESTFN2) | |
names = zipfp.namelist() | |
self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names) | |
self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names) | |
self.assertNotIn('mod2.txt', names) | |
finally: | |
shutil.rmtree(TESTFN2) | |
def test_write_non_pyfile(self): | |
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: | |
with open(TESTFN, 'w') as f: | |
f.write('most definitely not a python file') | |
self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) | |
os.remove(TESTFN) | |
class OtherTests(unittest.TestCase): | |
zips_with_bad_crc = { | |
zipfile.ZIP_STORED: ( | |
b'PK\003\004\024\0\0\0\0\0 \213\212;:r' | |
b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' | |
b'ilehello,AworldP' | |
b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' | |
b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' | |
b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' | |
b'lePK\005\006\0\0\0\0\001\0\001\0003\000' | |
b'\0\0/\0\0\0\0\0'), | |
zipfile.ZIP_DEFLATED: ( | |
b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' | |
b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' | |
b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' | |
b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' | |
b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' | |
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' | |
b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' | |
b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), | |
} | |
def test_unicode_filenames(self): | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
zf.writestr(u"foo.txt", "Test for unicode filename") | |
zf.writestr(u"\xf6.txt", "Test for unicode filename") | |
self.assertIsInstance(zf.infolist()[0].filename, str if sys.version_info[0] >= 3 else unicode) | |
with zipfile.ZipFile(TESTFN, "r") as zf: | |
self.assertEqual(zf.filelist[0].filename, u"foo.txt") | |
self.assertEqual(zf.filelist[1].filename, u"\xf6.txt") | |
def test_create_non_existent_file_for_append(self): | |
if os.path.exists(TESTFN): | |
os.unlink(TESTFN) | |
filename = 'testfile.txt' | |
content = b'hello, world. this is some content.' | |
try: | |
with zipfile.ZipFile(TESTFN, 'a') as zf: | |
zf.writestr(filename, content) | |
except IOError: | |
self.fail('Could not append data to a non-existent zip file.') | |
self.assertTrue(os.path.exists(TESTFN)) | |
with zipfile.ZipFile(TESTFN, 'r') as zf: | |
self.assertEqual(zf.read(filename), content) | |
def test_close_erroneous_file(self): | |
# This test checks that the ZipFile constructor closes the file object | |
# it opens if there's an error in the file. If it doesn't, the | |
# traceback holds a reference to the ZipFile object and, indirectly, | |
# the file object. | |
# On Windows, this causes the os.unlink() call to fail because the | |
# underlying file is still open. This is SF bug #412214. | |
# | |
with open(TESTFN, "w") as fp: | |
fp.write("this is not a legal zip file\n") | |
try: | |
zf = zipfile.ZipFile(TESTFN) | |
except zipfile.BadZipFile: | |
pass | |
def test_is_zip_erroneous_file(self): | |
"""Check that is_zipfile() correctly identifies non-zip files.""" | |
# - passing a filename | |
with open(TESTFN, "w") as fp: | |
fp.write("this is not a legal zip file\n") | |
chk = zipfile.is_zipfile(TESTFN) | |
self.assertFalse(chk) | |
# - passing a file object | |
with open(TESTFN, "rb") as fp: | |
chk = zipfile.is_zipfile(fp) | |
self.assertTrue(not chk) | |
# - passing a file-like object | |
fp = io.BytesIO() | |
fp.write(b"this is not a legal zip file\n") | |
chk = zipfile.is_zipfile(fp) | |
self.assertTrue(not chk) | |
fp.seek(0, 0) | |
chk = zipfile.is_zipfile(fp) | |
self.assertTrue(not chk) | |
def test_is_zip_valid_file(self): | |
"""Check that is_zipfile() correctly identifies zip files.""" | |
# - passing a filename | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
zipf.writestr("foo.txt", b"O, for a Muse of Fire!") | |
chk = zipfile.is_zipfile(TESTFN) | |
self.assertTrue(chk) | |
# - passing a file object | |
with open(TESTFN, "rb") as fp: | |
chk = zipfile.is_zipfile(fp) | |
self.assertTrue(chk) | |
fp.seek(0, 0) | |
zip_contents = fp.read() | |
# - passing a file-like object | |
fp = io.BytesIO() | |
fp.write(zip_contents) | |
chk = zipfile.is_zipfile(fp) | |
self.assertTrue(chk) | |
fp.seek(0, 0) | |
chk = zipfile.is_zipfile(fp) | |
self.assertTrue(chk) | |
def test_non_existent_file_raises_IOError(self): | |
# make sure we don't raise an AttributeError when a partially-constructed | |
# ZipFile instance is finalized; this tests for regression on SF tracker | |
# bug #403871. | |
# The bug we're testing for caused an AttributeError to be raised | |
# when a ZipFile instance was created for a file that did not | |
# exist; the .fp member was not initialized but was needed by the | |
# __del__() method. Since the AttributeError is in the __del__(), | |
# it is ignored, but the user should be sufficiently annoyed by | |
# the message on the output that regression will be noticed | |
# quickly. | |
self.assertRaises(IOError, zipfile.ZipFile, TESTFN) | |
def test_empty_file_raises_BadZipFile(self): | |
f = open(TESTFN, 'w') | |
f.close() | |
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) | |
with open(TESTFN, 'w') as fp: | |
fp.write("short file") | |
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) | |
def test_closed_zip_raises_RuntimeError(self): | |
"""Verify that testzip() doesn't swallow inappropriate exceptions.""" | |
data = io.BytesIO() | |
with zipfile.ZipFile(data, mode="w") as zipf: | |
zipf.writestr("foo.txt", "O, for a Muse of Fire!") | |
# This is correct; calling .read on a closed ZipFile should throw | |
# a RuntimeError, and so should calling .testzip. An earlier | |
# version of .testzip would swallow this exception (and any other) | |
# and report that the first file in the archive was corrupt. | |
self.assertRaises(RuntimeError, zipf.read, "foo.txt") | |
self.assertRaises(RuntimeError, zipf.open, "foo.txt") | |
self.assertRaises(RuntimeError, zipf.testzip) | |
self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus") | |
with open(TESTFN, 'w') as f: | |
f.write('zipfile test data') | |
self.assertRaises(RuntimeError, zipf.write, TESTFN) | |
def test_bad_constructor_mode(self): | |
"""Check that bad modes passed to ZipFile constructor are caught.""" | |
self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q") | |
def test_bad_open_mode(self): | |
"""Check that bad modes passed to ZipFile.open are caught.""" | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
zipf.writestr("foo.txt", "O, for a Muse of Fire!") | |
with zipfile.ZipFile(TESTFN, mode="r") as zipf: | |
# read the data to make sure the file is there | |
zipf.read("foo.txt") | |
self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") | |
def test_read0(self): | |
"""Check that calling read(0) on a ZipExtFile object returns an empty | |
string and doesn't advance file pointer.""" | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
zipf.writestr("foo.txt", "O, for a Muse of Fire!") | |
# read the data to make sure the file is there | |
with zipf.open("foo.txt") as f: | |
for i in range(FIXEDTEST_SIZE): | |
self.assertEqual(f.read(0), b'') | |
self.assertEqual(f.read(), b"O, for a Muse of Fire!") | |
def test_open_non_existent_item(self): | |
"""Check that attempting to call open() for an item that doesn't | |
exist in the archive raises a RuntimeError.""" | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
self.assertRaises(KeyError, zipf.open, "foo.txt", "r") | |
def test_bad_compression_mode(self): | |
"""Check that bad compression methods passed to ZipFile.open are | |
caught.""" | |
self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1) | |
def test_null_byte_in_filename(self): | |
"""Check that a filename containing a null byte is properly | |
terminated.""" | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") | |
self.assertEqual(zipf.namelist(), ['foo.txt']) | |
def test_struct_sizes(self): | |
"""Check that ZIP internal structure sizes are calculated correctly.""" | |
self.assertEqual(zipfile.sizeEndCentDir, 22) | |
self.assertEqual(zipfile.sizeCentralDir, 46) | |
self.assertEqual(zipfile.sizeEndCentDir64, 56) | |
self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) | |
def test_comments(self): | |
"""Check that comments on the archive are handled properly.""" | |
# check default comment is empty | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
self.assertEqual(zipf.comment, b'') | |
zipf.writestr("foo.txt", "O, for a Muse of Fire!") | |
with zipfile.ZipFile(TESTFN, mode="r") as zipfr: | |
self.assertEqual(zipfr.comment, b'') | |
# check a simple short comment | |
comment = b'Bravely taking to his feet, he beat a very brave retreat.' | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
zipf.comment = comment | |
zipf.writestr("foo.txt", "O, for a Muse of Fire!") | |
with zipfile.ZipFile(TESTFN, mode="r") as zipfr: | |
self.assertEqual(zipf.comment, comment) | |
# check a comment of max length | |
comment2 = ''.join(['%d' % (i ** 3 % 10) for i in range((1 << 16) - 1)]) | |
comment2 = comment2.encode("ascii") | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
zipf.comment = comment2 | |
zipf.writestr("foo.txt", "O, for a Muse of Fire!") | |
with zipfile.ZipFile(TESTFN, mode="r") as zipfr: | |
self.assertEqual(zipfr.comment, comment2) | |
# check a comment that is too long is truncated | |
with zipfile.ZipFile(TESTFN, mode="w") as zipf: | |
zipf.comment = comment2 + b'oops' | |
zipf.writestr("foo.txt", "O, for a Muse of Fire!") | |
with zipfile.ZipFile(TESTFN, mode="r") as zipfr: | |
self.assertEqual(zipfr.comment, comment2) | |
def check_testzip_with_bad_crc(self, compression): | |
"""Tests that files with bad CRCs return their name from testzip.""" | |
zipdata = self.zips_with_bad_crc[compression] | |
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: | |
# testzip returns the name of the first corrupt file, or None | |
self.assertEqual('afile', zipf.testzip()) | |
def test_testzip_with_bad_crc_stored(self): | |
self.check_testzip_with_bad_crc(zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_testzip_with_bad_crc_deflated(self): | |
self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) | |
def check_read_with_bad_crc(self, compression): | |
"""Tests that files with bad CRCs raise a BadZipFile exception when read.""" | |
zipdata = self.zips_with_bad_crc[compression] | |
# Using ZipFile.read() | |
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: | |
self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') | |
# Using ZipExtFile.read() | |
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: | |
with zipf.open('afile', 'r') as corrupt_file: | |
self.assertRaises(zipfile.BadZipFile, corrupt_file.read) | |
# Same with small reads (in order to exercise the buffering logic) | |
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: | |
with zipf.open('afile', 'r') as corrupt_file: | |
corrupt_file.MIN_READ_SIZE = 2 | |
with self.assertRaises(zipfile.BadZipFile): | |
while corrupt_file.read(2): | |
pass | |
def test_read_with_bad_crc_stored(self): | |
self.check_read_with_bad_crc(zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_read_with_bad_crc_deflated(self): | |
self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) | |
def check_read_return_size(self, compression): | |
# Issue #9837: ZipExtFile.read() shouldn't return more bytes | |
# than requested. | |
for test_size in (1, 4095, 4096, 4097, 16384): | |
file_size = test_size + 1 | |
junk = b''.join(struct.pack('B', randint(0, 255)) | |
for x in range(file_size)) | |
with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf: | |
zipf.writestr('foo', junk) | |
with zipf.open('foo', 'r') as fp: | |
buf = fp.read(test_size) | |
self.assertEqual(len(buf), test_size) | |
def test_read_return_size_stored(self): | |
self.check_read_return_size(zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_read_return_size_deflated(self): | |
self.check_read_return_size(zipfile.ZIP_DEFLATED) | |
def test_empty_zipfile(self): | |
# Check that creating a file in 'w' or 'a' mode and closing without | |
# adding any files to the archives creates a valid empty ZIP file | |
zipf = zipfile.ZipFile(TESTFN, mode="w") | |
zipf.close() | |
try: | |
zipf = zipfile.ZipFile(TESTFN, mode="r") | |
except zipfile.BadZipFile: | |
self.fail("Unable to create empty ZIP file in 'w' mode") | |
zipf = zipfile.ZipFile(TESTFN, mode="a") | |
zipf.close() | |
try: | |
zipf = zipfile.ZipFile(TESTFN, mode="r") | |
except: | |
self.fail("Unable to create empty ZIP file in 'a' mode") | |
def test_open_empty_file(self): | |
# Issue 1710703: Check that opening a file with less than 22 bytes | |
# raises a BadZipFile exception (rather than the previously unhelpful | |
# IOError) | |
f = open(TESTFN, 'w') | |
f.close() | |
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') | |
def tearDown(self): | |
unlink(TESTFN) | |
unlink(TESTFN2) | |
class DecryptionTests(unittest.TestCase): | |
"""Check that ZIP decryption works. Since the library does not | |
support encryption at the moment, we use a pre-generated encrypted | |
ZIP file.""" | |
data = ( | |
b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' | |
b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' | |
b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' | |
b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' | |
b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' | |
b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' | |
b'\x00\x00L\x00\x00\x00\x00\x00' ) | |
data2 = ( | |
b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' | |
b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' | |
b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' | |
b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' | |
b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' | |
b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' | |
b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' | |
b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) | |
plain = b'zipfile.py encryption test' | |
plain2 = b'\x00' * 512 | |
def setUp(self): | |
with open(TESTFN, "wb") as fp: | |
fp.write(self.data) | |
self.zip = zipfile.ZipFile(TESTFN, "r") | |
with open(TESTFN2, "wb") as fp: | |
fp.write(self.data2) | |
self.zip2 = zipfile.ZipFile(TESTFN2, "r") | |
def tearDown(self): | |
self.zip.close() | |
os.unlink(TESTFN) | |
self.zip2.close() | |
os.unlink(TESTFN2) | |
def test_no_password(self): | |
# Reading the encrypted file without password | |
# must generate a RunTime exception | |
self.assertRaises(RuntimeError, self.zip.read, "test.txt") | |
self.assertRaises(RuntimeError, self.zip2.read, "zero") | |
def test_bad_password(self): | |
self.zip.setpassword(b"perl") | |
self.assertRaises(RuntimeError, self.zip.read, "test.txt") | |
self.zip2.setpassword(b"perl") | |
self.assertRaises(RuntimeError, self.zip2.read, "zero") | |
@skipUnless(zlib, "requires zlib") | |
def test_good_password(self): | |
self.zip.setpassword(b"python") | |
self.assertEqual(self.zip.read("test.txt"), self.plain) | |
self.zip2.setpassword(b"12345") | |
self.assertEqual(self.zip2.read("zero"), self.plain2) | |
def test_unicode_password(self): | |
self.assertRaises(TypeError, self.zip.setpassword, u"unicode") | |
self.assertRaises(TypeError, self.zip.read, "test.txt", u"python") | |
self.assertRaises(TypeError, self.zip.open, "test.txt", pwd=u"python") | |
self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd=u"python") | |
class TestsWithRandomBinaryFiles(unittest.TestCase): | |
def setUp(self): | |
datacount = randint(16, 64) * 1024 + randint(1, 1024) | |
self.data = b''.join(struct.pack('<f', random() * randint(-1000, 1000)) | |
for i in range(datacount)) | |
# Make a source file with some lines | |
with open(TESTFN, "wb") as fp: | |
fp.write(self.data) | |
def tearDown(self): | |
unlink(TESTFN) | |
unlink(TESTFN2) | |
def make_test_archive(self, f, compression): | |
# Create the ZIP archive | |
with zipfile.ZipFile(f, "w", compression) as zipfp: | |
zipfp.write(TESTFN, "another.name") | |
zipfp.write(TESTFN, TESTFN) | |
def zip_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r", compression) as zipfp: | |
testdata = zipfp.read(TESTFN) | |
self.assertEqual(len(testdata), len(self.data)) | |
self.assertEqual(testdata, self.data) | |
self.assertEqual(zipfp.read("another.name"), self.data) | |
if not isinstance(f, str): | |
f.close() | |
def test_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_test(f, zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_test(f, zipfile.ZIP_DEFLATED) | |
def zip_open_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r", compression) as zipfp: | |
zipdata1 = [] | |
with zipfp.open(TESTFN) as zipopen1: | |
while True: | |
read_data = zipopen1.read(256) | |
if not read_data: | |
break | |
zipdata1.append(read_data) | |
zipdata2 = [] | |
with zipfp.open("another.name") as zipopen2: | |
while True: | |
read_data = zipopen2.read(256) | |
if not read_data: | |
break | |
zipdata2.append(read_data) | |
testdata1 = b''.join(zipdata1) | |
self.assertEqual(len(testdata1), len(self.data)) | |
self.assertEqual(testdata1, self.data) | |
testdata2 = b''.join(zipdata2) | |
self.assertEqual(len(testdata2), len(self.data)) | |
self.assertEqual(testdata2, self.data) | |
if not isinstance(f, str): | |
f.close() | |
def test_open_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_open_test(f, zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_open_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_open_test(f, zipfile.ZIP_DEFLATED) | |
def zip_random_open_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r", compression) as zipfp: | |
zipdata1 = [] | |
with zipfp.open(TESTFN) as zipopen1: | |
while True: | |
read_data = zipopen1.read(randint(1, 1024)) | |
if not read_data: | |
break | |
zipdata1.append(read_data) | |
testdata = b''.join(zipdata1) | |
self.assertEqual(len(testdata), len(self.data)) | |
self.assertEqual(testdata, self.data) | |
if not isinstance(f, str): | |
f.close() | |
def test_random_open_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_random_open_test(f, zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_random_open_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
class TestsWithMultipleOpens(unittest.TestCase): | |
def setUp(self): | |
# Create the ZIP archive | |
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: | |
zipfp.writestr('ones', '1' * FIXEDTEST_SIZE) | |
zipfp.writestr('twos', '2' * FIXEDTEST_SIZE) | |
def test_same_file(self): | |
# Verify that (when the ZipFile is in control of creating file objects) | |
# multiple open() calls can be made without interfering with each other. | |
with zipfile.ZipFile(TESTFN2, mode="r") as zipf: | |
with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: | |
data1 = zopen1.read(500) | |
data2 = zopen2.read(500) | |
data1 += zopen1.read(500) | |
data2 += zopen2.read(500) | |
self.assertEqual(data1, data2) | |
def test_different_file(self): | |
# Verify that (when the ZipFile is in control of creating file objects) | |
# multiple open() calls can be made without interfering with each other. | |
with zipfile.ZipFile(TESTFN2, mode="r") as zipf: | |
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: | |
data1 = zopen1.read(500) | |
data2 = zopen2.read(500) | |
data1 += zopen1.read(500) | |
data2 += zopen2.read(500) | |
self.assertEqual(data1, b'1' * FIXEDTEST_SIZE) | |
self.assertEqual(data2, b'2' * FIXEDTEST_SIZE) | |
def test_interleaved(self): | |
# Verify that (when the ZipFile is in control of creating file objects) | |
# multiple open() calls can be made without interfering with each other. | |
with zipfile.ZipFile(TESTFN2, mode="r") as zipf: | |
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: | |
data1 = zopen1.read(500) | |
data2 = zopen2.read(500) | |
data1 += zopen1.read(500) | |
data2 += zopen2.read(500) | |
self.assertEqual(data1, b'1' * FIXEDTEST_SIZE) | |
self.assertEqual(data2, b'2' * FIXEDTEST_SIZE) | |
def tearDown(self): | |
unlink(TESTFN2) | |
class TestWithDirectory(unittest.TestCase): | |
def setUp(self): | |
os.mkdir(TESTFN2) | |
def test_extract_dir(self): | |
with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: | |
zipf.extractall(TESTFN2) | |
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) | |
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) | |
self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) | |
def test_bug_6050(self): | |
# Extraction should succeed if directories already exist | |
os.mkdir(os.path.join(TESTFN2, "a")) | |
self.test_extract_dir() | |
def test_store_dir(self): | |
os.mkdir(os.path.join(TESTFN2, "x")) | |
zipf = zipfile.ZipFile(TESTFN, "w") | |
zipf.write(os.path.join(TESTFN2, "x"), "x") | |
self.assertTrue(zipf.filelist[0].filename.endswith("x/")) | |
def tearDown(self): | |
shutil.rmtree(TESTFN2) | |
if os.path.exists(TESTFN): | |
unlink(TESTFN) | |
class UniversalNewlineTests(unittest.TestCase): | |
def setUp(self): | |
self.line_gen = [bytes("Test of zipfile line %d." % i).encode("ascii") | |
for i in range(FIXEDTEST_SIZE)] | |
self.seps = ('\r', '\r\n', '\n') | |
self.arcdata, self.arcfiles = {}, {} | |
for n, s in enumerate(self.seps): | |
b = s.encode("ascii") | |
self.arcdata[s] = b.join(self.line_gen) + b | |
self.arcfiles[s] = '%s-%d' % (TESTFN, n) | |
f = open(self.arcfiles[s], "wb") | |
try: | |
f.write(self.arcdata[s]) | |
finally: | |
f.close() | |
def make_test_archive(self, f, compression): | |
# Create the ZIP archive | |
with zipfile.ZipFile(f, "w", compression) as zipfp: | |
for fn in self.arcfiles.values(): | |
zipfp.write(fn, fn) | |
def read_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
for sep, fn in self.arcfiles.items(): | |
with zipfp.open(fn, "rU") as fp: | |
zipdata = fp.read() | |
self.assertEqual(self.arcdata[sep], zipdata) | |
if not isinstance(f, str): | |
f.close() | |
def readline_read_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
for sep, fn in self.arcfiles.items(): | |
with zipfp.open(fn, "rU") as zipopen: | |
data = b'' | |
while True: | |
read = zipopen.readline() | |
if not read: | |
break | |
data += read | |
read = zipopen.read(5) | |
if not read: | |
break | |
data += read | |
self.assertEqual(data, self.arcdata['\n']) | |
if not isinstance(f, str): | |
f.close() | |
def readline_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
for sep, fn in self.arcfiles.items(): | |
with zipfp.open(fn, "rU") as zipopen: | |
for line in self.line_gen: | |
linedata = zipopen.readline() | |
self.assertEqual(linedata, line + b'\n') | |
if not isinstance(f, str): | |
f.close() | |
def readlines_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
for sep, fn in self.arcfiles.items(): | |
with zipfp.open(fn, "rU") as fp: | |
ziplines = fp.readlines() | |
for line, zipline in zip(self.line_gen, ziplines): | |
self.assertEqual(zipline, line + b'\n') | |
if not isinstance(f, str): | |
f.close() | |
def iterlines_test(self, f, compression): | |
self.make_test_archive(f, compression) | |
# Read the ZIP archive | |
with zipfile.ZipFile(f, "r") as zipfp: | |
for sep, fn in self.arcfiles.items(): | |
with zipfp.open(fn, "rU") as fp: | |
for line, zipline in zip(self.line_gen, fp): | |
self.assertEqual(zipline, line + b'\n') | |
if not isinstance(f, str): | |
f.close() | |
def test_read_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.read_test(f, zipfile.ZIP_STORED) | |
def test_readline_read_stored(self): | |
# Issue #7610: calls to readline() interleaved with calls to read(). | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.readline_read_test(f, zipfile.ZIP_STORED) | |
def test_readline_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.readline_test(f, zipfile.ZIP_STORED) | |
def test_readlines_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.readlines_test(f, zipfile.ZIP_STORED) | |
def test_iterlines_stored(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.iterlines_test(f, zipfile.ZIP_STORED) | |
@skipUnless(zlib, "requires zlib") | |
def test_read_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.read_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_readline_read_deflated(self): | |
# Issue #7610: calls to readline() interleaved with calls to read(). | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.readline_read_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_readline_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.readline_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_readlines_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.readlines_test(f, zipfile.ZIP_DEFLATED) | |
@skipUnless(zlib, "requires zlib") | |
def test_iterlines_deflated(self): | |
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): | |
self.iterlines_test(f, zipfile.ZIP_DEFLATED) | |
def tearDown(self): | |
for sep, fn in self.arcfiles.items(): | |
os.remove(fn) | |
unlink(TESTFN) | |
unlink(TESTFN2) | |
class RemoveTests(unittest.TestCase): | |
def test_simple(self): | |
fname = "foo.txt" | |
# remove with fname | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
zf.writestr(fname, "just add a file with a name and some data") | |
self.assertEqual(zf.infolist()[0].filename, fname) | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
zf.remove(fname) | |
self.assertEqual(len(zf.infolist()), 0) | |
# remove with zipinfo | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
zf.writestr(fname, "just add a file with a name and some data") | |
self.assertEqual(zf.infolist()[0].filename, fname) | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
zinfo = zf.getinfo(fname) | |
zf.remove(zinfo) | |
self.assertEqual(len(zf.infolist()), 0) | |
def write_three_and_remove_one(self, index): | |
data_list = [bytes([randint(0, 255) for i in range(10)]) for i in range(3)] | |
fname_list = ["firstfile", "secondfile", "thirdfile"] | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
for fname, data in zip(fname_list, data_list): | |
zf.writestr(fname, data) | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
zf.remove(fname_list[index]) | |
i_to_check = list(range(len(data_list))) | |
i_to_check.remove(index) | |
with zipfile.ZipFile(TESTFN, "r") as zf: | |
for i in i_to_check: | |
# the last reads fail if the pointers weren't corrected | |
self.assertEqual(zf.read(fname_list[i]), data_list[i]) | |
def test_remove_middle(self): | |
self.write_three_and_remove_one(0) | |
self.write_three_and_remove_one(1) | |
self.write_three_and_remove_one(2) | |
def test_with_data_descriptor(self): | |
fname = "foo.txt" | |
data = "just add a file with a name and some data" | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
zinfo = zipfile.ZipInfo(fname) | |
zinfo.flag_bits = zinfo.flag_bits | zipfile._FHF_HAS_DATA_DESCRIPTOR | |
zf.writestr(zinfo, data) | |
with zipfile.ZipFile(TESTFN, "r") as zf: | |
zinfo = zf.getinfo(fname) | |
data_desc_size = zf._get_data_descriptor_size(zinfo) | |
zlen = len(zinfo.FileHeader()) + zinfo.compress_size + len(zf._central_dir_header(zinfo)) + data_desc_size | |
size = os.path.getsize(TESTFN) | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
zf.remove(fname) | |
new_size = os.path.getsize(TESTFN) | |
size_delta = size - new_size | |
self.assertEqual(size_delta, zlen) | |
def test_shrinks(self): | |
fname = "foo.txt" | |
data = "just add a file with a name and some data" | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
zf.writestr(fname, data) | |
zinfo = zf.getinfo(fname) | |
# ignoring descriptor size because ZipFile by default doesn't use it | |
zlen = len(zinfo.FileHeader()) + zinfo.compress_size + len(zf._central_dir_header(zinfo)) | |
size = os.path.getsize(TESTFN) | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
zf.remove(fname) | |
new_size = os.path.getsize(TESTFN) | |
# size was 22 bytes vs 153 bytes on my machine btw | |
self.assertEqual(new_size, size - zlen) | |
def test_verifies_requirements(self): | |
fname = "foo.txt" | |
# test no remove on closed zipfile | |
with self.assertRaises(RuntimeError): | |
zf = zipfile.ZipFile(TESTFN, "w") | |
zf.writestr(fname, "just add a file with a name and some data") | |
zf.close() | |
zf.remove(fname) | |
# test no remove without "a" | |
with self.assertRaises(RuntimeError): | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
zf.writestr(fname, "just add a file with a name and some data") | |
zf.remove(fname) | |
def test_delete_add(self): | |
fname_list = ["foo.txt", "bar.txt", "blubb.bla", "sup.bro", "rock'n'roll"] | |
data_list = [''.join([chr(randint(0, 255)) for i in range(100)]) for i in range(len(fname_list))] | |
# add some files to the zip | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
for fname, data in zip(fname_list, data_list): | |
zf.writestr(fname, data) | |
for no in range(0, 3): | |
# remove the middle file and add it again | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
zf.remove(fname_list[no]) | |
zf.writestr(fname_list[no], data_list[no]) | |
# try to access prior deleted/added file and prior last file (which got moved, while delete) | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
for fname, data in zip(fname_list, data_list): | |
self.assertEqual(zf.read(fname), data) | |
def test_delete_add_no_close(self): | |
fname_list = ["foo.txt", "bar.txt", "blu.bla", "sup.bro", "rollah"] | |
data_list = [''.join([chr(randint(0, 255)) for i in range(randint(50, 150))]) for i in range(len(fname_list))] | |
# add some files to the zip | |
with zipfile.ZipFile(TESTFN, "w") as zf: | |
for fname, data in zip(fname_list, data_list): | |
zf.writestr(fname, data) | |
for no in range(0, 2): | |
with zipfile.ZipFile(TESTFN, "a") as zf: | |
# remove the middle file and add it again | |
zf.remove(fname_list[no]) | |
zf.writestr(fname_list[no], data_list[no]) | |
zf.remove(fname_list[no+1]) | |
zf.writestr(fname_list[no+1], data_list[no+1]) | |
# try to access prior deleted/added file and prior last file (which got moved, while delete) | |
for fname, data in zip(fname_list, data_list): | |
self.assertEqual(zf.read(fname), data) | |
def tearDown(self): | |
unlink(TESTFN) | |
def test_main(): | |
run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests, | |
PyZipFileTests, DecryptionTests, TestsWithMultipleOpens, | |
TestWithDirectory, UniversalNewlineTests, | |
TestsWithRandomBinaryFiles, RemoveTests) | |
if __name__ == "__main__": | |
test_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment