Skip to content

Instantly share code, notes, and snippets.

@FreakyBytes
Last active April 8, 2016 09:29
Show Gist options
  • Save FreakyBytes/30a6f9866154d82f1c3863f2e4969cc4 to your computer and use it in GitHub Desktop.
Save FreakyBytes/30a6f9866154d82f1c3863f2e4969cc4 to your computer and use it in GitHub Desktop.
fixed modified Python zipfile module
"""
stripped down version of the original Python test support module
Copyright (c) 2001-2016 Python Software Foundation; All Rights Reserved
"""
# 1. This LICENSE AGREEMENT is between the Python Software Foundation ("PSF"), and
# the Individual or Organization ("Licensee") accessing and otherwise using Python
# 2.7.11 software in source or binary form and its associated documentation.
#
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
# analyze, test, perform and/or display publicly, prepare derivative works,
# distribute, and otherwise use Python 2.7.11 alone or in any derivative
# version, provided, however, that PSF's License Agreement and PSF's notice of
# copyright, i.e., "Copyright (c) 2001-2016 Python Software Foundation; All Rights
# Reserved" are retained in Python 2.7.11 alone or in any derivative version
# prepared by Licensee.
#
# 3. In the event Licensee prepares a derivative work that is based on or
# incorporates Python 2.7.11 or any part thereof, and wants to make the
# derivative work available to others as provided herein, then Licensee hereby
# agrees to include in any such work a brief summary of the changes made to Python
# 2.7.11.
#
# 4. PSF is making Python 2.7.11 available to Licensee on an "AS IS" basis.
# PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
# EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
# WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE
# USE OF PYTHON 2.7.11 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
#
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.11
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
# MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.11, OR ANY DERIVATIVE
# THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
#
# 6. This License Agreement will automatically terminate upon a material breach of
# its terms and conditions.
#
# 7. Nothing in this License Agreement shall be deemed to create any relationship
# of agency, partnership, or joint venture between PSF and Licensee. This License
# Agreement does not grant permission to use PSF trademarks or trade name in a
# trademark sense to endorse or promote products or services of Licensee, or any
# third party.
#
# 8. By copying, installing or otherwise using Python 2.7.11, Licensee agrees
# to be bound by the terms and conditions of this License Agreement.
import os
import errno
import unittest
import sys
# Filename used for testing
if os.name == 'java':
# Jython disallows @ in module names
TESTFN = '$test'
else:
TESTFN = '@test'
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
# module name.
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
verbose = 1 # Flag set to 0 by regrtest.py
class BasicTestRunner:
def run(self, test):
result = unittest.TestResult()
test(result)
return result
class Error(Exception):
"""Base class for regression test exceptions."""
class TestFailed(Error):
"""Test failed."""
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
if verbose:
runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
else:
runner = BasicTestRunner()
result = runner.run(suite)
if not result.wasSuccessful():
if len(result.errors) == 1 and not result.failures:
err = result.errors[0][1]
elif len(result.failures) == 1 and not result.errors:
err = result.failures[0][1]
else:
err = "multiple errors occurred"
if not verbose: err += "; run in verbose mode for details"
raise TestFailed(err)
def run_unittest(*classes):
"""Run tests from unittest.TestCase-derived classes."""
valid_types = (unittest.TestSuite, unittest.TestCase)
suite = unittest.TestSuite()
for cls in classes:
if isinstance(cls, str):
if cls in sys.modules:
suite.addTest(unittest.findTestCases(sys.modules[cls]))
else:
raise ValueError("str arguments must be keys in sys.modules")
elif isinstance(cls, valid_types):
suite.addTest(cls)
else:
suite.addTest(unittest.makeSuite(cls))
_run_suite(suite)
def findfile(file, here=__file__, subdir=None):
"""Try to find a file on sys.path and the working directory. If it is not
found the argument passed to the function is returned (this does not
necessarily signal failure; could still be the legitimate path)."""
if os.path.isabs(file):
return file
if subdir is not None:
file = os.path.join(subdir, file)
path = sys.path
path = [os.path.dirname(here)] + path
for dn in path:
fn = os.path.join(dn, file)
if os.path.exists(fn): return fn
return file
def unlink(filename):
try:
os.unlink(filename)
except OSError as error:
# The filename need not exist.
if error.errno not in (errno.ENOENT, errno.ENOTDIR):
raise
"""
Read and write ZIP files.
modified version of the original Python 2.7 zipfile module
Copyright (c) 2001-2016 Python Software Foundation; All Rights Reserved
"""
from __future__ import print_function
# 1. This LICENSE AGREEMENT is between the Python Software Foundation ("PSF"), and
# the Individual or Organization ("Licensee") accessing and otherwise using Python
# 2.7.11 software in source or binary form and its associated documentation.
#
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
# analyze, test, perform and/or display publicly, prepare derivative works,
# distribute, and otherwise use Python 2.7.11 alone or in any derivative
# version, provided, however, that PSF's License Agreement and PSF's notice of
# copyright, i.e., "Copyright (c) 2001-2016 Python Software Foundation; All Rights
# Reserved" are retained in Python 2.7.11 alone or in any derivative version
# prepared by Licensee.
#
# 3. In the event Licensee prepares a derivative work that is based on or
# incorporates Python 2.7.11 or any part thereof, and wants to make the
# derivative work available to others as provided herein, then Licensee hereby
# agrees to include in any such work a brief summary of the changes made to Python
# 2.7.11.
#
# 4. PSF is making Python 2.7.11 available to Licensee on an "AS IS" basis.
# PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
# EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
# WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE
# USE OF PYTHON 2.7.11 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
#
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.11
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
# MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.11, OR ANY DERIVATIVE
# THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
#
# 6. This License Agreement will automatically terminate upon a material breach of
# its terms and conditions.
#
# 7. Nothing in this License Agreement shall be deemed to create any relationship
# of agency, partnership, or joint venture between PSF and Licensee. This License
# Agreement does not grant permission to use PSF trademarks or trade name in a
# trademark sense to endorse or promote products or services of Licensee, or any
# third party.
#
# 8. By copying, installing or otherwise using Python 2.7.11, Licensee agrees
# to be bound by the terms and conditions of this License Agreement.
"""
Read and write ZIP files.
XXX references to utf-8 need further investigation.
"""
import io
import os
import re
import imp
import sys
import time
import stat
import shutil
import struct
import binascii
try:
import zlib # We may need its compression method
crc32 = zlib.crc32
except ImportError:
zlib = None
crc32 = binascii.crc32
__all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED",
"is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"]
class BadZipFile(Exception):
pass
class LargeZipFile(Exception):
"""
Raised when writing a zipfile, the zipfile requires ZIP64 extensions
and those extensions are disabled.
"""
error = BadZipfile = BadZipFile # Pre-3.2 compatibility names
ZIP64_LIMIT = (1 << 31) - 1
ZIP_FILECOUNT_LIMIT = 1 << 16
ZIP_MAX_COMMENT = (1 << 16) - 1
# constants for Zip file compression methods
ZIP_STORED = 0
ZIP_DEFLATED = 8
# Other ZIP compression methods not supported
# Below are some formats and associated data for reading/writing headers using
# the struct module. The names and structures of headers/records are those used
# in the PKWARE description of the ZIP file format:
# http://www.pkware.com/documents/casestudies/APPNOTE.TXT
# (URL valid as of January 2008)
# The "end of central directory" structure, magic number, size, and indices
# (section V.I in the format document)
structEndArchive = b"<4s4H2LH"
stringEndArchive = b"PK\005\006"
sizeEndCentDir = struct.calcsize(structEndArchive)
_ECD_SIGNATURE = 0
_ECD_DISK_NUMBER = 1
_ECD_DISK_START = 2
_ECD_ENTRIES_THIS_DISK = 3
_ECD_ENTRIES_TOTAL = 4
_ECD_SIZE = 5
_ECD_OFFSET = 6
_ECD_COMMENT_SIZE = 7
# These last two indices are not part of the structure as defined in the
# spec, but they are used internally by this module as a convenience
_ECD_COMMENT = 8
_ECD_LOCATION = 9
# The "central directory" structure, magic number, size, and indices
# of entries in the structure (section V.F in the format document)
structCentralDir = "<4s4B4HL2L5H2L"
stringCentralDir = b"PK\001\002"
sizeCentralDir = struct.calcsize(structCentralDir)
# indexes of entries in the central directory structure
_CD_SIGNATURE = 0
_CD_CREATE_VERSION = 1
_CD_CREATE_SYSTEM = 2
_CD_EXTRACT_VERSION = 3
_CD_EXTRACT_SYSTEM = 4
_CD_FLAG_BITS = 5
_CD_COMPRESS_TYPE = 6
_CD_TIME = 7
_CD_DATE = 8
_CD_CRC = 9
_CD_COMPRESSED_SIZE = 10
_CD_UNCOMPRESSED_SIZE = 11
_CD_FILENAME_LENGTH = 12
_CD_EXTRA_FIELD_LENGTH = 13
_CD_COMMENT_LENGTH = 14
_CD_DISK_NUMBER_START = 15
_CD_INTERNAL_FILE_ATTRIBUTES = 16
_CD_EXTERNAL_FILE_ATTRIBUTES = 17
_CD_LOCAL_HEADER_OFFSET = 18
# The "local file header" structure, magic number, size, and indices
# (section V.A in the format document)
structFileHeader = "<4s2B4HL2L2H"
stringFileHeader = b"PK\003\004"
sizeFileHeader = struct.calcsize(structFileHeader)
_FH_SIGNATURE = 0
_FH_EXTRACT_VERSION = 1
_FH_EXTRACT_SYSTEM = 2
_FH_GENERAL_PURPOSE_FLAG_BITS = 3
_FH_COMPRESSION_METHOD = 4
_FH_LAST_MOD_TIME = 5
_FH_LAST_MOD_DATE = 6
_FH_CRC = 7
_FH_COMPRESSED_SIZE = 8
_FH_UNCOMPRESSED_SIZE = 9
_FH_FILENAME_LENGTH = 10
_FH_EXTRA_FIELD_LENGTH = 11
_FHF_HAS_DATA_DESCRIPTOR = 0x8
dataDescriptorSignature = 0x08074b50
# The "Zip64 end of central directory locator" structure, magic number, and size
structEndArchive64Locator = "<4sLQL"
stringEndArchive64Locator = b"PK\x06\x07"
sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator)
# The "Zip64 end of central directory" record, magic number, size, and indices
# (section V.G in the format document)
structEndArchive64 = "<4sQ2H2L4Q"
stringEndArchive64 = b"PK\x06\x06"
sizeEndCentDir64 = struct.calcsize(structEndArchive64)
_CD64_SIGNATURE = 0
_CD64_DIRECTORY_RECSIZE = 1
_CD64_CREATE_VERSION = 2
_CD64_EXTRACT_VERSION = 3
_CD64_DISK_NUMBER = 4
_CD64_DISK_NUMBER_START = 5
_CD64_NUMBER_ENTRIES_THIS_DISK = 6
_CD64_NUMBER_ENTRIES_TOTAL = 7
_CD64_DIRECTORY_SIZE = 8
_CD64_OFFSET_START_CENTDIR = 9
def _check_zipfile(fp):
try:
if _EndRecData(fp):
return True # file has correct magic number
except IOError:
pass
return False
def is_zipfile(filename):
"""Quickly see if a file is a ZIP file by checking the magic number.
The filename argument may be a file or file-like object too.
"""
result = False
try:
if hasattr(filename, "read"):
result = _check_zipfile(fp=filename)
else:
with open(filename, "rb") as fp:
result = _check_zipfile(fp)
except IOError:
pass
return result
def _EndRecData64(fpin, offset, endrec):
"""
Read the ZIP64 end-of-archive records and use that to update endrec
"""
try:
fpin.seek(offset - sizeEndCentDir64Locator, 2)
except IOError:
# If the seek fails, the file is not large enough to contain a ZIP64
# end-of-archive record, so just return the end record we were given.
return endrec
data = fpin.read(sizeEndCentDir64Locator)
sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
if sig != stringEndArchive64Locator:
return endrec
if diskno != 0 or disks != 1:
raise BadZipFile("zipfiles that span multiple disks are not supported")
# Assume no 'zip64 extensible data'
fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2)
data = fpin.read(sizeEndCentDir64)
sig, sz, create_version, read_version, disk_num, disk_dir, \
dircount, dircount2, dirsize, diroffset = \
struct.unpack(structEndArchive64, data)
if sig != stringEndArchive64:
return endrec
# Update the original endrec using data from the ZIP64 record
endrec[_ECD_SIGNATURE] = sig
endrec[_ECD_DISK_NUMBER] = disk_num
endrec[_ECD_DISK_START] = disk_dir
endrec[_ECD_ENTRIES_THIS_DISK] = dircount
endrec[_ECD_ENTRIES_TOTAL] = dircount2
endrec[_ECD_SIZE] = dirsize
endrec[_ECD_OFFSET] = diroffset
return endrec
def _EndRecData(fpin):
"""Return data from the "End of Central Directory" record, or None.
The data is a list of the nine items in the ZIP "End of central dir"
record followed by a tenth item, the file seek offset of this record."""
# Determine file size
fpin.seek(0, 2)
filesize = fpin.tell()
# Check to see if this is ZIP file with no archive comment (the
# "end of central directory" structure should be the last item in the
# file if this is the case).
try:
fpin.seek(-sizeEndCentDir, 2)
except IOError:
return None
data = fpin.read()
if data[0:4] == stringEndArchive and data[-2:] == b"\000\000":
# the signature is correct and there's no comment, unpack structure
endrec = struct.unpack(structEndArchive, data)
endrec=list(endrec)
# Append a blank comment and record start offset
endrec.append(b"")
endrec.append(filesize - sizeEndCentDir)
# Try to read the "Zip64 end of central directory" structure
return _EndRecData64(fpin, -sizeEndCentDir, endrec)
# Either this is not a ZIP file, or it is a ZIP file with an archive
# comment. Search the end of the file for the "end of central directory"
# record signature. The comment is the last item in the ZIP file and may be
# up to 64K long. It is assumed that the "end of central directory" magic
# number does not appear in the comment.
maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0)
fpin.seek(maxCommentStart, 0)
data = fpin.read()
start = data.rfind(stringEndArchive)
if start >= 0:
# found the magic number; attempt to unpack and interpret
recData = data[start:start+sizeEndCentDir]
endrec = list(struct.unpack(structEndArchive, recData))
comment = data[start+sizeEndCentDir:]
# check that comment length is correct
if endrec[_ECD_COMMENT_SIZE] == len(comment):
# Append the archive comment and start offset
endrec.append(comment)
endrec.append(maxCommentStart + start)
# Try to read the "Zip64 end of central directory" structure
return _EndRecData64(fpin, maxCommentStart + start - filesize,
endrec)
# Unable to find a valid end of central directory structure
return
class ZipInfo (object):
"""Class with attributes describing each file in the ZIP archive."""
__slots__ = (
'orig_filename',
'filename',
'date_time',
'compress_type',
'comment',
'extra',
'create_system',
'create_version',
'extract_version',
'reserved',
'flag_bits',
'volume',
'internal_attr',
'external_attr',
'header_offset',
'CRC',
'compress_size',
'file_size',
'_raw_time',
)
def __init__(self, filename="NoName", date_time=(1980,1,1,0,0,0)):
self.orig_filename = filename # Original file name in archive
# Terminate the file name at the first null byte. Null bytes in file
# names are used as tricks by viruses in archives.
null_byte = filename.find(chr(0))
if null_byte >= 0:
filename = filename[0:null_byte]
# This is used to ensure paths in generated ZIP files always use
# forward slashes as the directory separator, as required by the
# ZIP format specification.
if os.sep != "/" and os.sep in filename:
filename = filename.replace(os.sep, "/")
self.filename = filename # Normalized file name
self.date_time = date_time # year, month, day, hour, min, sec
# Standard values:
self.compress_type = ZIP_STORED # Type of compression for the file
self.comment = b"" # Comment for each file
self.extra = b"" # ZIP extra data
if sys.platform == 'win32':
self.create_system = 0 # System which created ZIP archive
else:
# Assume everything else is unix-y
self.create_system = 3 # System which created ZIP archive
self.create_version = 20 # Version which created ZIP archive
self.extract_version = 20 # Version needed to extract archive
self.reserved = 0 # Must be zero
self.flag_bits = 0 # ZIP flag bits
self.volume = 0 # Volume number of file header
self.internal_attr = 0 # Internal attributes
self.external_attr = 0 # External file attributes
# Other attributes are set by class ZipFile:
# header_offset Byte offset to the file header
# CRC CRC-32 of the uncompressed file
# compress_size Size of the compressed file
# file_size Size of the uncompressed file
def FileHeader(self):
"""Return the per-file header as a string."""
dt = self.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
if self.flag_bits & _FHF_HAS_DATA_DESCRIPTOR:
# Set these to zero because we write them after the file data
CRC = compress_size = file_size = 0
else:
CRC = self.CRC
compress_size = self.compress_size
file_size = self.file_size
extra = self.extra
if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT:
# File is larger than what fits into a 4 byte integer,
# fall back to the ZIP64 extension
fmt = '<HHQQ'
extra = extra + struct.pack(fmt,
1, struct.calcsize(fmt)-4, file_size, compress_size)
file_size = 0xffffffff
compress_size = 0xffffffff
self.extract_version = max(45, self.extract_version)
self.create_version = max(45, self.extract_version)
filename, flag_bits = self._encodeFilenameFlags()
header = struct.pack(structFileHeader, stringFileHeader,
self.extract_version, self.reserved, flag_bits,
self.compress_type, dostime, dosdate, CRC,
compress_size, file_size,
len(filename), len(extra))
return header + filename + extra
def _encodeFilenameFlags(self):
if sys.version_info[0] >= 3 or isinstance(self.filename, unicode):
try:
return self.filename.encode('ascii'), self.flag_bits
except UnicodeEncodeError:
return self.filename.encode('utf-8'), self.flag_bits | 0x800
else:
return self.filename, self.flag_bits
def _decodeExtra(self):
# Try to decode the extra field.
extra = self.extra
unpack = struct.unpack
while extra:
tp, ln = unpack('<HH', extra[:4])
if tp == 1:
if ln >= 24:
counts = unpack('<QQQ', extra[4:28])
elif ln == 16:
counts = unpack('<QQ', extra[4:20])
elif ln == 8:
counts = unpack('<Q', extra[4:12])
elif ln == 0:
counts = ()
else:
raise RuntimeError("Corrupt extra field %s"%(ln,))
idx = 0
# ZIP64 extension (large files and/or large archives)
if self.file_size in (0xffffffffffffffff, 0xffffffff):
self.file_size = counts[idx]
idx += 1
if self.compress_size == 0xFFFFFFFF:
self.compress_size = counts[idx]
idx += 1
if self.header_offset == 0xffffffff:
old = self.header_offset
self.header_offset = counts[idx]
idx+=1
extra = extra[ln+4:]
class _ZipDecrypter:
"""Class to handle decryption of files stored within a ZIP archive.
ZIP supports a password-based form of encryption. Even though known
plaintext attacks have been found against it, it is still useful
to be able to get data out of such a file.
Usage:
zd = _ZipDecrypter(mypwd)
plain_char = zd(cypher_char)
plain_text = map(zd, cypher_text)
"""
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
crctable = _GenerateCRCTable()
def _crc32(self, ch, crc):
"""Compute the CRC32 primitive on one byte."""
if isinstance(ch, (str, unicode)):
ch = ord(ch)
return ((crc >> 8) & 0xffffff) ^ self.crctable[(crc ^ ch) & 0xff]
def __init__(self, pwd):
self.key0 = 305419896
self.key1 = 591751049
self.key2 = 878082192
for p in pwd:
self._UpdateKeys(p)
def _UpdateKeys(self, c):
self.key0 = self._crc32(c, self.key0)
self.key1 = (self.key1 + (self.key0 & 255)) & 4294967295
self.key1 = (self.key1 * 134775813 + 1) & 4294967295
self.key2 = self._crc32((self.key1 >> 24) & 255, self.key2)
def __call__(self, c):
"""Decrypt a single character."""
if isinstance(c, (str, unicode)):
c = ord(c)
k = self.key2 | 2
c = c ^ (((k * (k^1)) >> 8) & 255)
self._UpdateKeys(c)
return c
class ZipExtFile(io.BufferedIOBase):
"""File-like object for reading an archive member.
Is returned by ZipFile.open().
"""
# Max size supported by decompressor.
MAX_N = 1 << 31 - 1
# Read from compressed files in 4k blocks.
MIN_READ_SIZE = 4096
# Search for universal newlines or line chunks.
PATTERN = re.compile(br'^(?P<chunk>[^\r\n]+)|(?P<newline>\n|\r\n?)')
def __init__(self, fileobj, mode, zipinfo, decrypter=None,
close_fileobj=False):
self._fileobj = fileobj
self._decrypter = decrypter
self._close_fileobj = close_fileobj
self._compress_type = zipinfo.compress_type
self._compress_size = zipinfo.compress_size
self._compress_left = zipinfo.compress_size
if self._compress_type == ZIP_DEFLATED:
self._decompressor = zlib.decompressobj(-15)
self._unconsumed = b''
self._readbuffer = b''
self._offset = 0
self._universal = 'U' in mode
self.newlines = None
# Adjust read size for encrypted files since the first 12 bytes
# are for the encryption/password information.
if self._decrypter is not None:
self._compress_left -= 12
self.mode = mode
self.name = zipinfo.filename
if hasattr(zipinfo, 'CRC'):
self._expected_crc = zipinfo.CRC
self._running_crc = crc32(b'') & 0xffffffff
else:
self._expected_crc = None
def readline(self, limit=-1):
"""Read and return a line from the stream.
If limit is specified, at most limit bytes will be read.
"""
if not self._universal and limit < 0:
# Shortcut common case - newline found in buffer.
i = self._readbuffer.find(b'\n', self._offset) + 1
if i > 0:
line = self._readbuffer[self._offset: i]
self._offset = i
return line
if not self._universal:
return io.BufferedIOBase.readline(self, limit)
line = b''
while limit < 0 or len(line) < limit:
readahead = self.peek(2)
if readahead == b'':
return line
#
# Search for universal newlines or line chunks.
#
# The pattern returns either a line chunk or a newline, but not
# both. Combined with peek(2), we are assured that the sequence
# '\r\n' is always retrieved completely and never split into
# separate newlines - '\r', '\n' due to coincidental readaheads.
#
match = self.PATTERN.search(readahead)
newline = match.group('newline')
if newline is not None:
if self.newlines is None:
self.newlines = []
if newline not in self.newlines:
self.newlines.append(newline)
self._offset += len(newline)
return line + b'\n'
chunk = match.group('chunk')
if limit >= 0:
chunk = chunk[: limit - len(line)]
self._offset += len(chunk)
line += chunk
return line
def peek(self, n=1):
"""Returns buffered bytes without advancing the position."""
if n > len(self._readbuffer) - self._offset:
chunk = self.read(n)
self._offset -= len(chunk)
# Return up to 512 bytes to reduce allocation overhead for tight loops.
return self._readbuffer[self._offset: self._offset + 512]
def readable(self):
return True
def read(self, n=-1):
"""Read and return up to n bytes.
If the argument is omitted, None, or negative, data is read and returned until EOF is reached..
"""
buf = b''
if n is None:
n = -1
while True:
if n < 0:
data = self.read1(n)
elif n > len(buf):
data = self.read1(n - len(buf))
else:
return buf
if len(data) == 0:
return buf
buf += data
def _update_crc(self, newdata, eof):
# Update the CRC using the given data.
if self._expected_crc is None:
# No need to compute the CRC if we don't have a reference value
return
self._running_crc = crc32(newdata, self._running_crc) & 0xffffffff
# Check the CRC if we're at the end of the file
if eof and self._running_crc != self._expected_crc:
raise BadZipFile("Bad CRC-32 for file %r" % self.name)
def read1(self, n):
"""Read up to n bytes with at most one read() system call."""
# Simplify algorithm (branching) by transforming negative n to large n.
if n < 0 or n is None:
n = self.MAX_N
# Bytes available in read buffer.
len_readbuffer = len(self._readbuffer) - self._offset
# Read from file.
if self._compress_left > 0 and n > len_readbuffer + len(self._unconsumed):
nbytes = n - len_readbuffer - len(self._unconsumed)
nbytes = max(nbytes, self.MIN_READ_SIZE)
nbytes = min(nbytes, self._compress_left)
data = self._fileobj.read(nbytes)
self._compress_left -= len(data)
if data and self._decrypter is not None:
data = ''.join(chr(i) for i in map(self._decrypter, data))
if self._compress_type == ZIP_STORED:
self._update_crc(data, eof=(self._compress_left==0))
self._readbuffer = self._readbuffer[self._offset:] + data
self._offset = 0
else:
# Prepare deflated bytes for decompression.
self._unconsumed += data
# Handle unconsumed data.
if (len(self._unconsumed) > 0 and n > len_readbuffer and
self._compress_type == ZIP_DEFLATED):
data = self._decompressor.decompress(
self._unconsumed,
max(n - len_readbuffer, self.MIN_READ_SIZE)
)
self._unconsumed = self._decompressor.unconsumed_tail
eof = len(self._unconsumed) == 0 and self._compress_left == 0
if eof:
data += self._decompressor.flush()
self._update_crc(data, eof=eof)
self._readbuffer = self._readbuffer[self._offset:] + data
self._offset = 0
# Read from buffer.
data = self._readbuffer[self._offset: self._offset + n]
self._offset += len(data)
return data
def close(self):
try:
if self._close_fileobj:
self._fileobj.close()
finally:
super(ZipExtFile, self).close()
class ZipFile(object):
""" Class with methods to open, read, write, remove, close, list zip files.
z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=False)
file: Either the path to the file, or a file-like object.
If it is a path, the file will be opened and closed by ZipFile.
mode: The mode can be either read "r", write "w" or append "a".
compression: ZIP_STORED (no compression) or ZIP_DEFLATED (requires zlib).
allowZip64: if True ZipFile will create files with ZIP64 extensions when
needed, otherwise it will raise an exception when this would
be necessary.
"""
fp = None # Set here since __del__ checks it
def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
"""Open the ZIP file with mode read "r", write "w" or append "a"."""
if mode not in ("r", "w", "a"):
raise RuntimeError('ZipFile() requires mode "r", "w", or "a"')
if compression == ZIP_STORED:
pass
elif compression == ZIP_DEFLATED:
if not zlib:
raise RuntimeError(
"Compression requires the (missing) zlib module")
else:
raise RuntimeError("That compression method is not supported")
self._allowZip64 = allowZip64
self._didModify = False
self.debug = 0 # Level of printing: 0 through 3
self.NameToInfo = {} # Find file info given name
self.filelist = [] # List of ZipInfo instances for archive
self.compression = compression # Method of compression
self.mode = key = mode.replace('b', '')[0]
self.pwd = None
self.comment = b''
# Check if we were passed a file-like object
if isinstance(file, str):
# No, it's a filename
self._filePassed = 0
self.filename = file
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'}
try:
self.fp = io.open(file, modeDict[mode])
except IOError:
if mode == 'a':
mode = key = 'w'
self.fp = io.open(file, modeDict[mode])
else:
raise
else:
self._filePassed = 1
self.fp = file
self.filename = getattr(file, 'name', None)
if key == 'r':
self._GetContents()
elif key == 'w':
# set the modified flag so central directory gets written
# even if no files are added to the archive
self._didModify = True
elif key == 'a':
try:
# See if file is a zip file
self._RealGetContents()
# seek to start of directory and overwrite
self.fp.seek(self.start_dir, 0)
except BadZipFile:
# file is not a zip file, just append
self.fp.seek(0, 2)
# set the modified flag so central directory gets written
# even if no files are added to the archive
self._didModify = True
else:
if not self._filePassed:
self.fp.close()
self.fp = None
raise RuntimeError('Mode must be "r", "w" or "a"')
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def _GetContents(self):
"""Read the directory, making sure we close the file if the format
is bad."""
try:
self._RealGetContents()
except BadZipFile:
if not self._filePassed:
self.fp.close()
self.fp = None
raise
def _RealGetContents(self):
"""Read in the table of contents for the ZIP file."""
fp = self.fp
try:
endrec = _EndRecData(fp)
except IOError:
raise BadZipFile("File is not a zip file")
if not endrec:
raise BadZipFile("File is not a zip file")
if self.debug > 1:
print(endrec)
size_cd = endrec[_ECD_SIZE] # bytes in central directory
offset_cd = endrec[_ECD_OFFSET] # offset of central directory
self.comment = endrec[_ECD_COMMENT] # archive comment
# "concat" is zero, unless zip was concatenated to another file
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
if endrec[_ECD_SIGNATURE] == stringEndArchive64:
# If Zip64 extension structures are present, account for them
concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator)
if self.debug > 2:
inferred = concat + offset_cd
print(u"given, inferred, offset", offset_cd, inferred, concat)
# self.start_dir: Position of start of central directory
self.start_dir = offset_cd + concat
fp.seek(self.start_dir, 0)
data = fp.read(size_cd)
fp = io.BytesIO(data)
total = 0
while total < size_cd:
centdir = fp.read(sizeCentralDir)
if centdir[0:4] != stringCentralDir:
raise BadZipFile("Bad magic number for central directory")
centdir = struct.unpack(structCentralDir, centdir)
if self.debug > 2:
print(centdir)
filename = fp.read(centdir[_CD_FILENAME_LENGTH])
flags = centdir[5]
if flags & 0x800:
# UTF-8 file names extension
filename = filename.decode('utf-8')
else:
# Historical ZIP filename encoding
filename = filename.decode('cp437')
# Create ZipInfo instance to store file information
x = ZipInfo(filename)
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET]
(x.create_version, x.create_system, x.extract_version, x.reserved,
x.flag_bits, x.compress_type, t, d,
x.CRC, x.compress_size, x.file_size) = centdir[1:12]
x.volume, x.internal_attr, x.external_attr = centdir[15:18]
# Convert date/time code to (year, month, day, hour, min, sec)
x._raw_time = t
x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F,
t>>11, (t>>5)&0x3F, (t&0x1F) * 2 )
x._decodeExtra()
x.header_offset = x.header_offset + concat
self.filelist.append(x)
self.NameToInfo[x.filename] = x
# update total bytes read from central directory
total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH]
+ centdir[_CD_EXTRA_FIELD_LENGTH]
+ centdir[_CD_COMMENT_LENGTH])
if self.debug > 2:
print(u"total", total)
def namelist(self):
"""Return a list of file names in the archive."""
l = []
for data in self.filelist:
l.append(data.filename)
return l
def infolist(self):
"""Return a list of class ZipInfo instances for files in the
archive."""
return self.filelist
def printdir(self, file=None):
"""Print a table of contents for the zip file."""
print(u"%-46s %19s %12s" % (u"File Name", u"Modified ", u"Size"),
file=file)
for zinfo in self.filelist:
date = u"%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6]
print(u"%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size),
file=file)
def testzip(self):
"""Read all the files and check the CRC."""
chunk_size = 2 ** 20
for zinfo in self.filelist:
try:
# Read by chunks, to avoid an OverflowError or a
# MemoryError with very large embedded files.
f = self.open(zinfo.filename, "r")
while f.read(chunk_size): # Check CRC-32
pass
except BadZipFile:
return zinfo.filename
def getinfo(self, name):
"""Return the instance of ZipInfo given 'name'."""
info = self.NameToInfo.get(name)
if info is None:
raise KeyError(
'There is no item named %r in the archive' % name)
return info
def setpassword(self, pwd):
"""Set default password for encrypted files."""
if pwd and not isinstance(pwd, bytes):
raise TypeError("pwd: expected bytes, got %s" % type(pwd))
if pwd:
self.pwd = pwd
else:
self.pwd = None
def read(self, name, pwd=None):
"""Return file bytes (as a string) for name."""
with self.open(name, "r", pwd) as fp:
return fp.read()
def open(self, name, mode="r", pwd=None):
"""Return file-like object for 'name'."""
if mode not in ("r", "U", "rU"):
raise RuntimeError('open() requires mode "r", "U", or "rU"')
if pwd and not isinstance(pwd, bytes):
raise TypeError("pwd: expected bytes, got %s" % type(pwd))
if not self.fp:
raise RuntimeError(
"Attempt to read ZIP archive that was already closed")
# Only open a new file for instances where we were not
# given a file object in the constructor
if self._filePassed:
zef_file = self.fp
else:
zef_file = io.open(self.filename, 'rb')
# Make sure we have an info object
if isinstance(name, ZipInfo):
# 'name' is already an info object
zinfo = name
else:
# Get info object for name
try:
zinfo = self.getinfo(name)
except KeyError:
if not self._filePassed:
zef_file.close()
raise
zef_file.seek(zinfo.header_offset, 0)
# Skip the file header:
fheader = zef_file.read(sizeFileHeader)
if fheader[0:4] != stringFileHeader:
raise BadZipFile("Bad magic number for file header")
fheader = struct.unpack(structFileHeader, fheader)
fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
if fheader[_FH_EXTRA_FIELD_LENGTH]:
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
if zinfo.flag_bits & 0x800:
# UTF-8 filename
fname_str = fname.decode("utf-8")
else:
fname_str = fname.decode("cp437")
if fname_str != zinfo.orig_filename:
if not self._filePassed:
zef_file.close()
raise BadZipFile(
'File name in directory %r and header %r differ.'
% (zinfo.orig_filename, fname))
# check for encrypted flag & handle password
is_encrypted = zinfo.flag_bits & 0x1
zd = None
if is_encrypted:
if not pwd:
pwd = self.pwd
if not pwd:
if not self._filePassed:
zef_file.close()
raise RuntimeError("File %s is encrypted, "
"password required for extraction" % name)
zd = _ZipDecrypter(pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
header = zef_file.read(12)
h = list(map(zd, header[0:12]))
if zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR:
# compare against the file type from extended local headers
check_byte = (zinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (zinfo.CRC >> 24) & 0xff
if h[11] != check_byte:
if not self._filePassed:
zef_file.close()
raise RuntimeError("Bad password for file", name)
return ZipExtFile(zef_file, mode, zinfo, zd,
close_fileobj=not self._filePassed)
def extract(self, member, path=None, pwd=None):
"""Extract a member from the archive to the current working directory,
using its full name. Its file information is extracted as accurately
as possible. `member' may be a filename or a ZipInfo object. You can
specify a different directory using `path'.
"""
if not isinstance(member, ZipInfo):
member = self.getinfo(member)
if path is None:
path = os.getcwd()
return self._extract_member(member, path, pwd)
def extractall(self, path=None, members=None, pwd=None):
"""Extract all members from the archive to the current working
directory. `path' specifies a different directory to extract to.
`members' is optional and must be a subset of the list returned
by namelist().
"""
if members is None:
members = self.namelist()
for zipinfo in members:
self.extract(zipinfo, path, pwd)
def _extract_member(self, member, targetpath, pwd):
"""Extract the ZipInfo object 'member' to a physical
file on the path targetpath.
"""
# build the destination pathname, replacing
# forward slashes to platform specific separators.
# Strip trailing path separator, unless it represents the root.
if (targetpath[-1:] in (os.path.sep, os.path.altsep)
and len(os.path.splitdrive(targetpath)[1]) > 1):
targetpath = targetpath[:-1]
# don't include leading "/" from file name if present
if member.filename[0] == '/':
targetpath = os.path.join(targetpath, member.filename[1:])
else:
targetpath = os.path.join(targetpath, member.filename)
targetpath = os.path.normpath(targetpath)
# Create all upper directories if necessary.
upperdirs = os.path.dirname(targetpath)
if upperdirs and not os.path.exists(upperdirs):
os.makedirs(upperdirs)
if member.filename[-1] == '/':
if not os.path.isdir(targetpath):
os.mkdir(targetpath)
return targetpath
source = self.open(member, pwd=pwd)
target = open(targetpath, "wb")
shutil.copyfileobj(source, target)
source.close()
target.close()
return targetpath
def _writecheck(self, zinfo):
"""Check for errors before writing a file to the archive."""
if zinfo.filename in self.NameToInfo:
if self.debug: # Warning for duplicate names
print(u"Duplicate name:", zinfo.filename)
if self.mode not in ("w", "a"):
raise RuntimeError('write() requires mode "w" or "a"')
if not self.fp:
raise RuntimeError(
"Attempt to write ZIP archive that was already closed")
if zinfo.compress_type == ZIP_DEFLATED and not zlib:
raise RuntimeError(
"Compression requires the (missing) zlib module")
if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED):
raise RuntimeError("That compression method is not supported")
if zinfo.file_size > ZIP64_LIMIT:
if not self._allowZip64:
raise LargeZipFile("Filesize would require ZIP64 extensions")
if zinfo.header_offset > ZIP64_LIMIT:
if not self._allowZip64:
raise LargeZipFile(
"Zipfile size would require ZIP64 extensions")
def write(self, filename, arcname=None, compress_type=None):
"""Put the bytes from filename into the archive under the name
arcname."""
if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed")
st = os.stat(filename)
isdir = stat.S_ISDIR(st.st_mode)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
arcname = filename
arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
while arcname[0] in (os.sep, os.altsep):
arcname = arcname[1:]
if isdir:
arcname += '/'
zinfo = ZipInfo(arcname, date_time)
zinfo.external_attr = (st[0] & 0xFFFF) << 16 # Unix attributes
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type
zinfo.file_size = st.st_size
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell() # Start of header bytes
self._writecheck(zinfo)
self._didModify = True
if isdir:
zinfo.file_size = 0
zinfo.compress_size = 0
zinfo.CRC = 0
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
self.fp.write(zinfo.FileHeader())
return
with open(filename, "rb") as fp:
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
zinfo.file_size = file_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
else:
cmpr = None
while 1:
buf = fp.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Seek backwards and write CRC and file sizes
position = self.fp.tell() # Preserve current position in file
self.fp.seek(zinfo.header_offset + 14, 0)
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
def writestr(self, zinfo_or_arcname, data, compress_type=None):
"""Write a file into the archive. The contents is 'data', which
may be either a 'str' or a 'bytes' instance; if it is a 'str',
it is encoded as UTF-8 first.
'zinfo_or_arcname' is either a ZipInfo instance or
the name of the file in the archive."""
if isinstance(data, str) and sys.version_info[0] >= 3:
data = data.encode("utf-8")
if not isinstance(zinfo_or_arcname, ZipInfo):
zinfo = ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6])
zinfo.compress_type = self.compression
zinfo.external_attr = 0o600 << 16
else:
zinfo = zinfo_or_arcname
if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed")
zinfo.file_size = len(data) # Uncompressed size
zinfo.header_offset = self.fp.tell() # Start of header data
if compress_type is not None:
zinfo.compress_type = compress_type
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = crc32(data) & 0xffffffff # CRC-32 checksum
if zinfo.compress_type == ZIP_DEFLATED:
co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
data = co.compress(data) + co.flush()
zinfo.compress_size = len(data) # Compressed size
else:
zinfo.compress_size = zinfo.file_size
zinfo.header_offset = self.fp.tell() # Start of header data
self.fp.write(zinfo.FileHeader())
self.fp.write(data)
if zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR:
# Write CRC and file sizes after the file data
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.fp.flush()
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
def _get_data_descriptor_size(self, zinfo):
if self.mode not in ("r", "a"):
raise RuntimeError('to read the data descriptor requires mode "r" or "a"')
if not zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR:
return 0
original_fp = self.fp.tell()
data_descriptor_fp = zinfo.header_offset + len(zinfo.FileHeader()) + zinfo.compress_size
self.fp.seek(data_descriptor_fp)
sig_or_not_bin = self.fp.read(4)
sig_or_not = struct.unpack('<L', sig_or_not_bin)
self.fp.seek(original_fp)
# The data descriptor can either have the signature or not, yet
# the standards don't specify the signature as an illegal CRC.
if sig_or_not == dataDescriptorSignature:
return 16
else:
return 12
def remove(self, member):
"""Remove a file from the archive. Only works if the ZipFile was opened
with mode 'a'."""
if "a" not in self.mode:
raise RuntimeError('remove() requires mode "a"')
if not self.fp:
raise RuntimeError(
"Attempt to modify ZIP archive that was already closed")
fp = self.fp
# Make sure we have an info object
if isinstance(member, ZipInfo):
# 'member' is already an info object
zinfo = member
else:
# Get info object for member
zinfo = self.getinfo(member)
# start at the pos of the first member (smallest offset)
position = min([info.header_offset for info in self.filelist]) # start at the beginning of first file
for info in self.filelist:
fileheader = info.FileHeader()
# is member after delete one?
if info.header_offset > zinfo.header_offset and info != zinfo:
# rewrite FileHeader and copy compressed data
# Skip the file header:
fp.seek(info.header_offset)
fheader = fp.read(sizeFileHeader)
if fheader[0:4] != stringFileHeader:
raise BadZipFile("Bad magic number for file header")
fheader = struct.unpack(structFileHeader, fheader)
fname = fp.read(fheader[_FH_FILENAME_LENGTH])
if fheader[_FH_EXTRA_FIELD_LENGTH]:
fp.read(fheader[_FH_EXTRA_FIELD_LENGTH])
if zinfo.flag_bits & 0x800:
# UTF-8 filename
fname_str = fname.decode("utf-8")
else:
fname_str = fname.decode("cp437")
if fname_str != info.orig_filename:
if not self._filePassed:
fp.close()
raise BadZipFile(
'File name in directory %r and header %r differ.'
% (zinfo.orig_filename, fname))
# read the actual data
data = fp.read(fheader[_FH_COMPRESSED_SIZE])
# modify info obj
info.header_offset = position
# jump to new position
fp.seek(info.header_offset, 0)
# write fileheader and data
fp.write(fileheader)
fp.write(data)
if zinfo.flag_bits & _FHF_HAS_DATA_DESCRIPTOR:
# Write CRC and file sizes after the file data
fp.write(struct.pack("<LLL", info.CRC, info.compress_size,
info.file_size))
# update position
fp.flush()
position = fp.tell()
elif info != zinfo:
# move to next position
position = position + info.compress_size + len(fileheader) + self._get_data_descriptor_size(info)
# Fix class members with state
self.start_dir = position
self._didModify = True
self.filelist.remove(zinfo)
del self.NameToInfo[zinfo.filename]
# write new central directory (includes truncate)
fp.seek(position, 0)
self._write_central_dir()
fp.seek(self.start_dir, 0) # jump to the beginning of the central directory, so it gets overridden at close()
def __del__(self):
"""Call the "close()" method in case the user forgot."""
self.close()
def _central_dir_header(self, zinfo):
dt = zinfo.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
extra = []
if zinfo.file_size > ZIP64_LIMIT \
or zinfo.compress_size > ZIP64_LIMIT:
extra.append(zinfo.file_size)
extra.append(zinfo.compress_size)
file_size = 0xffffffff
compress_size = 0xffffffff
else:
file_size = zinfo.file_size
compress_size = zinfo.compress_size
if zinfo.header_offset > ZIP64_LIMIT:
extra.append(zinfo.header_offset)
header_offset = 0xffffffff
else:
header_offset = zinfo.header_offset
extra_data = zinfo.extra
if extra:
# Append a ZIP64 field to the extra's
extra_data = struct.pack(
'<HH' + 'Q'*len(extra),
1, 8*len(extra), *extra) + extra_data
extract_version = max(45, zinfo.extract_version)
create_version = max(45, zinfo.create_version)
else:
extract_version = zinfo.extract_version
create_version = zinfo.create_version
try:
filename, flag_bits = zinfo._encodeFilenameFlags()
centdir = struct.pack(structCentralDir,
stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset)
except DeprecationWarning:
print((structCentralDir, stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
zinfo.flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(zinfo.filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset), file=sys.stderr)
raise
return centdir + filename + extra_data + zinfo.comment
def _write_central_dir(self):
if self.fp is None:
return
if self.mode in ("w", "a"): # write ending records
count = 0
pos1 = self.fp.tell()
for zinfo in self.filelist: # write central directory
count = count + 1
self.fp.write(self._central_dir_header(zinfo))
pos2 = self.fp.tell()
# Write end-of-zip-archive record
centDirCount = count
centDirSize = pos2 - pos1
centDirOffset = pos1
if (centDirCount >= ZIP_FILECOUNT_LIMIT or
centDirOffset > ZIP64_LIMIT or
centDirSize > ZIP64_LIMIT):
# Need to write the ZIP64 end-of-archive records
zip64endrec = struct.pack(
structEndArchive64, stringEndArchive64,
44, 45, 45, 0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset)
self.fp.write(zip64endrec)
zip64locrec = struct.pack(
structEndArchive64Locator,
stringEndArchive64Locator, 0, pos2, 1)
self.fp.write(zip64locrec)
centDirCount = min(centDirCount, 0xFFFF)
centDirSize = min(centDirSize, 0xFFFFFFFF)
centDirOffset = min(centDirOffset, 0xFFFFFFFF)
# check for valid comment length
if len(self.comment) >= ZIP_MAX_COMMENT:
if self.debug > 0:
msg = 'Archive comment is too long; truncating to %d bytes' \
% ZIP_MAX_COMMENT
self.comment = self.comment[:ZIP_MAX_COMMENT]
endrec = struct.pack(structEndArchive, stringEndArchive,
0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset, len(self.comment))
self.fp.write(endrec)
self.fp.write(self.comment)
self.fp.flush()
self.fp.truncate()
def close(self):
"""Close the file, and for mode "w" and "a" write the ending
records."""
if self.fp is None:
return
if self.mode in ("w", "a") and self._didModify: # write ending records
self._write_central_dir()
if not self._filePassed:
self.fp.close()
self.fp = None
class PyZipFile(ZipFile):
"""Class to create ZIP archives with Python library files and packages."""
def __init__(self, file, mode="r", compression=ZIP_STORED,
allowZip64=False, optimize=-1):
ZipFile.__init__(self, file, mode=mode, compression=compression,
allowZip64=allowZip64)
self._optimize = optimize
def writepy(self, pathname, basename=""):
"""Add all files from "pathname" to the ZIP archive.
If pathname is a package directory, search the directory and
all package subdirectories recursively for all *.py and enter
the modules into the archive. If pathname is a plain
directory, listdir *.py and enter all modules. Else, pathname
must be a Python *.py file and the module will be put into the
archive. Added modules are always module.pyo or module.pyc.
This method will compile the module.py into module.pyc if
necessary.
"""
dir, name = os.path.split(pathname)
if os.path.isdir(pathname):
initname = os.path.join(pathname, "__init__.py")
if os.path.isfile(initname):
# This is a package directory, add it
if basename:
basename = "%s/%s" % (basename, name)
else:
basename = name
if self.debug:
print(u"Adding package in", pathname, "as", basename)
fname, arcname = self._get_codename(initname[0:-3], basename)
if self.debug:
print(u"Adding", arcname)
self.write(fname, arcname)
dirlist = os.listdir(pathname)
dirlist.remove("__init__.py")
# Add all *.py files and package subdirectories
for filename in dirlist:
path = os.path.join(pathname, filename)
root, ext = os.path.splitext(filename)
if os.path.isdir(path):
if os.path.isfile(os.path.join(path, "__init__.py")):
# This is a package directory, add it
self.writepy(path, basename) # Recursive call
elif ext == ".py":
fname, arcname = self._get_codename(path[0:-3],
basename)
if self.debug:
print(u"Adding", arcname)
self.write(fname, arcname)
else:
# This is NOT a package directory, add its files at top level
if self.debug:
print("Adding files from directory", pathname)
for filename in os.listdir(pathname):
path = os.path.join(pathname, filename)
root, ext = os.path.splitext(filename)
if ext == ".py":
fname, arcname = self._get_codename(path[0:-3],
basename)
if self.debug:
print(u"Adding", arcname)
self.write(fname, arcname)
else:
if pathname[-3:] != ".py":
raise RuntimeError(
'Files added with writepy() must end with ".py"')
fname, arcname = self._get_codename(pathname[0:-3], basename)
if self.debug:
print(u"Adding file", arcname)
self.write(fname, arcname)
def _get_codename(self, pathname, basename):
"""Return (filename, archivename) for the path.
Given a module name path, return the correct file path and
archive name, compiling if necessary. For example, given
/python/lib/string, return (/python/lib/string.pyc, string).
"""
def _compile(file, optimize=-1):
import py_compile
if self.debug:
print(u"Compiling", file)
try:
if sys.version_info[0] >= 3 and sys.version_info[1] >= 2:
py_compile.compile(file, doraise=True, optimize=optimize)
else:
py_compile.compile(file, doraise=True)
except py_compile.PyCompileError as error:
print(error.msg)
return False
return True
file_py = pathname + ".py"
file_pyc = pathname + ".pyc"
file_pyo = pathname + ".pyo"
if hasattr(imp, 'cache_from_source'):
pycache_pyc = imp.cache_from_source(file_py, True)
pycache_pyo = imp.cache_from_source(file_py, False)
else:
pycache_pyc = file_pyc
pycache_pyo = file_pyo
if self._optimize == -1:
# legacy mode: use whatever file is present
if (os.path.isfile(file_pyo) and
os.stat(file_pyo).st_mtime >= os.stat(file_py).st_mtime):
# Use .pyo file.
arcname = fname = file_pyo
elif (os.path.isfile(file_pyc) and
os.stat(file_pyc).st_mtime >= os.stat(file_py).st_mtime):
# Use .pyc file.
arcname = fname = file_pyc
elif (os.path.isfile(pycache_pyc) and
os.stat(pycache_pyc).st_mtime >= os.stat(file_py).st_mtime):
# Use the __pycache__/*.pyc file, but write it to the legacy pyc
# file name in the archive.
fname = pycache_pyc
arcname = file_pyc
elif (os.path.isfile(pycache_pyo) and
os.stat(pycache_pyo).st_mtime >= os.stat(file_py).st_mtime):
# Use the __pycache__/*.pyo file, but write it to the legacy pyo
# file name in the archive.
fname = pycache_pyo
arcname = file_pyo
else:
# Compile py into PEP 3147 pyc file.
if _compile(file_py):
fname = (pycache_pyc if __debug__ else pycache_pyo)
arcname = (file_pyc if __debug__ else file_pyo)
else:
fname = arcname = file_py
else:
# new mode: use given optimization level
if self._optimize == 0:
fname = pycache_pyc
arcname = file_pyc
else:
fname = pycache_pyo
arcname = file_pyo
if not (os.path.isfile(fname) and
os.stat(fname).st_mtime >= os.stat(file_py).st_mtime):
if not _compile(file_py, optimize=self._optimize):
fname = arcname = file_py
archivename = os.path.split(arcname)[1]
if basename:
archivename = "%s/%s" % (basename, archivename)
return (fname, archivename)
def main(args = None):
import textwrap
USAGE=textwrap.dedent("""\
Usage:
zipfile.py -l zipfile.zip # Show listing of a zipfile
zipfile.py -t zipfile.zip # Test if a zipfile is valid
zipfile.py -e zipfile.zip target # Extract zipfile into target dir
zipfile.py -c zipfile.zip src ... # Create zipfile from sources
""")
if args is None:
args = sys.argv[1:]
if not args or args[0] not in ('-l', '-c', '-e', '-t'):
print(USAGE)
sys.exit(1)
if args[0] == '-l':
if len(args) != 2:
print(USAGE)
sys.exit(1)
zf = ZipFile(args[1], 'r')
zf.printdir()
zf.close()
elif args[0] == '-t':
if len(args) != 2:
print(USAGE)
sys.exit(1)
zf = ZipFile(args[1], 'r')
badfile = zf.testzip()
if badfile:
print(u"The following enclosed file is corrupted: {!r}".format(badfile))
print(u"Done testing")
elif args[0] == '-e':
if len(args) != 3:
print(USAGE)
sys.exit(1)
zf = ZipFile(args[1], 'r')
out = args[2]
for path in zf.namelist():
if path.startswith('./'):
tgt = os.path.join(out, path[2:])
else:
tgt = os.path.join(out, path)
tgtdir = os.path.dirname(tgt)
if not os.path.exists(tgtdir):
os.makedirs(tgtdir)
with open(tgt, 'wb') as fp:
fp.write(zf.read(path))
zf.close()
elif args[0] == '-c':
if len(args) < 3:
print(USAGE)
sys.exit(1)
def addToZip(zf, path, zippath):
if os.path.isfile(path):
zf.write(path, zippath, ZIP_DEFLATED)
elif os.path.isdir(path):
for nm in os.listdir(path):
addToZip(zf,
os.path.join(path, nm), os.path.join(zippath, nm))
# else: ignore
zf = ZipFile(args[1], 'w', allowZip64=True)
for src in args[2:]:
addToZip(zf, src, os.path.basename(src))
zf.close()
if __name__ == "__main__":
main()
"""
modified version of the original Python zipfile unittest
Copyright (c) 2001-2016 Python Software Foundation; All Rights Reserved
"""
# 1. This LICENSE AGREEMENT is between the Python Software Foundation ("PSF"), and
# the Individual or Organization ("Licensee") accessing and otherwise using Python
# 2.7.11 software in source or binary form and its associated documentation.
#
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
# analyze, test, perform and/or display publicly, prepare derivative works,
# distribute, and otherwise use Python 2.7.11 alone or in any derivative
# version, provided, however, that PSF's License Agreement and PSF's notice of
# copyright, i.e., "Copyright (c) 2001-2016 Python Software Foundation; All Rights
# Reserved" are retained in Python 2.7.11 alone or in any derivative version
# prepared by Licensee.
#
# 3. In the event Licensee prepares a derivative work that is based on or
# incorporates Python 2.7.11 or any part thereof, and wants to make the
# derivative work available to others as provided herein, then Licensee hereby
# agrees to include in any such work a brief summary of the changes made to Python
# 2.7.11.
#
# 4. PSF is making Python 2.7.11 available to Licensee on an "AS IS" basis.
# PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
# EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
# WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE
# USE OF PYTHON 2.7.11 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
#
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.11
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
# MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.11, OR ANY DERIVATIVE
# THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
#
# 6. This License Agreement will automatically terminate upon a material breach of
# its terms and conditions.
#
# 7. Nothing in this License Agreement shall be deemed to create any relationship
# of agency, partnership, or joint venture between PSF and Licensee. This License
# Agreement does not grant permission to use PSF trademarks or trade name in a
# trademark sense to endorse or promote products or services of Licensee, or any
# third party.
#
# 8. By copying, installing or otherwise using Python 2.7.11, Licensee agrees
# to be bound by the terms and conditions of this License Agreement.
# We can test part of the module without zlib.
try:
import zlib
except ImportError:
zlib = None
import io
import os
import sys
import imp
import time
import shutil
import struct
import unittest
import combinearchive.custom_zip as zipfile
from tempfile import TemporaryFile
from random import randint, random
from unittest import skipUnless
from tests.custom_support import TESTFN, run_unittest, findfile, unlink
TESTFN2 = TESTFN + "2"
TESTFNDIR = TESTFN + "d"
FIXEDTEST_SIZE = 1000
DATAFILES_DIR = 'zipfile_datafiles'
SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
('ziptest2dir/_ziptest2', 'qawsedrftg'),
('/ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
class TestsWithSourceFile(unittest.TestCase):
def setUp(self):
self.line_gen = (bytes("Zipfile test line %d. random float: %f" %
(i, random())).encode("ascii")
for i in range(FIXEDTEST_SIZE))
self.data = b'\n'.join(self.line_gen) + b'\n'
# Make a source file with some lines
with open(TESTFN, "wb") as fp:
fp.write(self.data)
def make_test_archive(self, f, compression):
# Create the ZIP archive
with zipfile.ZipFile(f, "w", compression) as zipfp:
zipfp.write(TESTFN, "another.name")
zipfp.write(TESTFN, TESTFN)
zipfp.writestr("strfile", self.data)
def zip_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp:
self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read("another.name"), self.data)
self.assertEqual(zipfp.read("strfile"), self.data)
# Print the ZIP directory
fp = io.StringIO()
zipfp.printdir(file=fp)
directory = fp.getvalue()
lines = directory.splitlines()
self.assertEqual(len(lines), 4) # Number of files + header
self.assertIn('File Name', lines[0])
self.assertIn('Modified', lines[0])
self.assertIn('Size', lines[0])
fn, date, time_, size = lines[1].split()
self.assertEqual(fn, 'another.name')
self.assertTrue(time.strptime(date, '%Y-%m-%d'))
self.assertTrue(time.strptime(time_, '%H:%M:%S'))
self.assertEqual(size, str(len(self.data)))
# Check the namelist
names = zipfp.namelist()
self.assertEqual(len(names), 3)
self.assertIn(TESTFN, names)
self.assertIn("another.name", names)
self.assertIn("strfile", names)
# Check infolist
infos = zipfp.infolist()
names = [i.filename for i in infos]
self.assertEqual(len(names), 3)
self.assertIn(TESTFN, names)
self.assertIn("another.name", names)
self.assertIn("strfile", names)
for i in infos:
self.assertEqual(i.file_size, len(self.data))
# check getinfo
for nm in (TESTFN, "another.name", "strfile"):
info = zipfp.getinfo(nm)
self.assertEqual(info.filename, nm)
self.assertEqual(info.file_size, len(self.data))
# Check that testzip doesn't raise an exception
zipfp.testzip()
if not isinstance(f, str):
f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_STORED)
def zip_open_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = []
with zipfp.open(TESTFN) as zipopen1:
while True:
read_data = zipopen1.read(256)
if not read_data:
break
zipdata1.append(read_data)
zipdata2 = []
with zipfp.open("another.name") as zipopen2:
while True:
read_data = zipopen2.read(256)
if not read_data:
break
zipdata2.append(read_data)
self.assertEqual(b''.join(zipdata1), self.data)
self.assertEqual(b''.join(zipdata2), self.data)
if not isinstance(f, str):
f.close()
def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_STORED)
def test_open_via_zip_info(self):
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("name", "foo")
zipfp.writestr("name", "bar")
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
infos = zipfp.infolist()
data = b""
for info in infos:
with zipfp.open(info) as zipopen:
data += zipopen.read()
self.assertTrue(data == b"foobar" or data == b"barfoo")
data = b""
for info in infos:
data += zipfp.read(info)
self.assertTrue(data == b"foobar" or data == b"barfoo")
def zip_random_open_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = []
with zipfp.open(TESTFN) as zipopen1:
while True:
read_data = zipopen1.read(randint(1, 1024))
if not read_data:
break
zipdata1.append(read_data)
self.assertEqual(b''.join(zipdata1), self.data)
if not isinstance(f, str):
f.close()
def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_STORED)
def test_univeral_readaheads(self):
f = io.BytesIO()
data = b'a\r\n' * 16 * 1024
zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED)
zipfp.writestr(TESTFN, data)
zipfp.close()
data2 = b''
zipfp = zipfile.ZipFile(f, 'r')
with zipfp.open(TESTFN, 'rU') as zipopen:
for line in zipopen:
data2 += line
zipfp.close()
self.assertEqual(data, data2.replace(b'\n', b'\r\n'))
def zip_readline_read_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r")
with zipfp.open(TESTFN) as zipopen:
data = b''
while True:
read = zipopen.readline()
if not read:
break
data += read
read = zipopen.read(100)
if not read:
break
data += read
self.assertEqual(data, self.data)
zipfp.close()
if not isinstance(f, str):
f.close()
def zip_readline_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
with zipfp.open(TESTFN) as zipopen:
for line in self.line_gen:
linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n')
if not isinstance(f, str):
f.close()
def zip_readlines_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
with zipfp.open(TESTFN) as zipopen:
ziplines = zipopen.readlines()
for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n')
if not isinstance(f, str):
f.close()
def zip_iterlines_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
with zipfp.open(TESTFN) as zipopen:
for line, zipline in zip(self.line_gen, zipopen):
self.assertEqual(zipline, line + '\n')
if not isinstance(f, str):
f.close()
def test_readline_read_stored(self):
# Issue #7610: calls to readline() interleaved with calls to read().
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_read_test(f, zipfile.ZIP_STORED)
def test_readline_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_test(f, zipfile.ZIP_STORED)
def test_readlines_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readlines_test(f, zipfile.ZIP_STORED)
def test_iterlines_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_iterlines_test(f, zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_open_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_random_open_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_readline_read_deflated(self):
# Issue #7610: calls to readline() interleaved with calls to read().
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_readline_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_readlines_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readlines_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_iterlines_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_low_compression(self):
"""Check for cases where compressed data is larger than original."""
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
zipfp.writestr("strfile", '12')
# Get an open object for strfile
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
with zipfp.open("strfile") as openobj:
self.assertEqual(openobj.read(1), b'1')
self.assertEqual(openobj.read(1), b'2')
def test_absolute_arcnames(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, "/absolute")
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
self.assertEqual(zipfp.namelist(), ["absolute"])
def test_append_to_zip_file(self):
"""Test appending to an existing zipfile."""
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN)
with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("strfile", self.data)
self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
def test_append_to_non_zip_file(self):
"""Test appending to an existing file that is not a zipfile."""
# NOTE: this test fails if len(d) < 22 because of the first
# line "fpin.seek(-22, 2)" in _EndRecData
data = b'I am not a ZipFile!' * 10
with open(TESTFN2, 'wb') as f:
f.write(data)
with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN)
with open(TESTFN2, 'rb') as f:
f.seek(len(data))
with zipfile.ZipFile(f, "r") as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN])
def test_write_default_name(self):
"""Check that calling ZipFile.write without arcname specified
produces the expected result."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.write(TESTFN)
with open(TESTFN, "rb") as f:
self.assertEqual(zipfp.read(TESTFN), f.read())
@skipUnless(zlib, "requires zlib")
def test_per_file_compression(self):
"""Check that files within a Zip archive can have different
compression options."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
sinfo = zipfp.getinfo('storeme')
dinfo = zipfp.getinfo('deflateme')
self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
def test_write_to_readonly(self):
"""Check that trying to call write() on a readonly ZipFile object
raises a RuntimeError."""
with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
zipfp.writestr("somefile.txt", "bogus")
with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
self.assertRaises(RuntimeError, zipfp.write, TESTFN)
def test_extract(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
writtenfile = zipfp.extract(fpath)
# make sure it was written to the right place
if os.path.isabs(fpath):
correctfile = os.path.join(os.getcwd(), fpath[1:])
else:
correctfile = os.path.join(os.getcwd(), fpath)
correctfile = os.path.normpath(correctfile)
self.assertEqual(writtenfile, correctfile)
# make sure correct data is in correct file
with open(writtenfile, "rb") as f:
self.assertEqual(fdata.encode(), f.read())
os.remove(writtenfile)
# remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_all(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
zipfp.extractall()
for fpath, fdata in SMALL_TEST_DATA:
if os.path.isabs(fpath):
outfile = os.path.join(os.getcwd(), fpath[1:])
else:
outfile = os.path.join(os.getcwd(), fpath)
with open(outfile, "rb") as f:
self.assertEqual(fdata.encode(), f.read())
os.remove(outfile)
# remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_writestr_compression(self):
zipfp = zipfile.ZipFile(TESTFN2, "w")
zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED)
if zlib:
zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED)
info = zipfp.getinfo('a.txt')
self.assertEqual(info.compress_type, zipfile.ZIP_STORED)
if zlib:
info = zipfp.getinfo('b.txt')
self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED)
def zip_test_writestr_permissions(self, f, compression):
# Make sure that writestr creates files with mode 0600,
# when it is passed a name rather than a ZipInfo instance.
self.make_test_archive(f, compression)
with zipfile.ZipFile(f, "r") as zipfp:
zinfo = zipfp.getinfo('strfile')
self.assertEqual(zinfo.external_attr, 0o600 << 16)
if not isinstance(f, str):
f.close()
def test_writestr_permissions(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
def test_writestr_extended_local_header_issue1202(self):
with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
for data in 'abcdefghijklmnop':
zinfo = zipfile.ZipInfo(data)
zinfo.flag_bits |= 0x08 # Include an extended local header.
orig_zip.writestr(zinfo, data)
def test_close(self):
"""Check that the zipfile is closed after the 'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipFile()
except zipfile.BadZipFile:
self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
def test_unicode_filenames(self):
# bug #10801
fname = findfile('zip_cp437_header.zip')
with zipfile.ZipFile(fname) as zipfp:
for name in zipfp.namelist():
zipfp.open(name).close()
def tearDown(self):
unlink(TESTFN)
unlink(TESTFN2)
class TestZip64InSmallFiles(unittest.TestCase):
# These tests test the ZIP64 functionality without using large files,
# see test_zipfile64 for proper tests.
def setUp(self):
line_gen = (bytes("Test of zipfile line %d." % i).encode("ascii")
for i in range(0, FIXEDTEST_SIZE))
self.data = b'\n'.join(line_gen)
# Make a source file with some lines
with open(TESTFN, "wb") as fp:
fp.write(self.data)
# modify ZIP64_LIMIT last, so if write fails it won't affect the other tests
self._limit = zipfile.ZIP64_LIMIT
zipfile.ZIP64_LIMIT = 5
def large_file_exception_test(self, f, compression):
with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile,
zipfp.write, TESTFN, "another.name")
def large_file_exception_test2(self, f, compression):
with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile,
zipfp.writestr, "another.name", self.data)
if not isinstance(f, str):
f.close()
def test_large_file_exception(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.large_file_exception_test(f, zipfile.ZIP_STORED)
self.large_file_exception_test2(f, zipfile.ZIP_STORED)
def zip_test(self, f, compression):
# Create the ZIP archive
with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
zipfp.write(TESTFN, "another.name")
zipfp.write(TESTFN, TESTFN)
zipfp.writestr("strfile", self.data)
# Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp:
self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read("another.name"), self.data)
self.assertEqual(zipfp.read("strfile"), self.data)
# Print the ZIP directory
fp = io.StringIO()
zipfp.printdir(fp)
directory = fp.getvalue()
lines = directory.splitlines()
self.assertEqual(len(lines), 4) # Number of files + header
self.assertIn('File Name', lines[0])
self.assertIn('Modified', lines[0])
self.assertIn('Size', lines[0])
fn, date, time_, size = lines[1].split()
self.assertEqual(fn, 'another.name')
self.assertTrue(time.strptime(date, '%Y-%m-%d'))
self.assertTrue(time.strptime(time_, '%H:%M:%S'))
self.assertEqual(size, str(len(self.data)))
# Check the namelist
names = zipfp.namelist()
self.assertEqual(len(names), 3)
self.assertIn(TESTFN, names)
self.assertIn("another.name", names)
self.assertIn("strfile", names)
# Check infolist
infos = zipfp.infolist()
names = [i.filename for i in infos]
self.assertEqual(len(names), 3)
self.assertIn(TESTFN, names)
self.assertIn("another.name", names)
self.assertIn("strfile", names)
for i in infos:
self.assertEqual(i.file_size, len(self.data))
# check getinfo
for nm in (TESTFN, "another.name", "strfile"):
info = zipfp.getinfo(nm)
self.assertEqual(info.filename, nm)
self.assertEqual(info.file_size, len(self.data))
# Check that testzip doesn't raise an exception
zipfp.testzip()
if not isinstance(f, str):
f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_DEFLATED)
def test_absolute_arcnames(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
allowZip64=True) as zipfp:
zipfp.write(TESTFN, "/absolute")
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
self.assertEqual(zipfp.namelist(), ["absolute"])
def tearDown(self):
zipfile.ZIP64_LIMIT = self._limit
unlink(TESTFN)
unlink(TESTFN2)
class PyZipFileTests(unittest.TestCase):
def test_write_pyfile(self):
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
fn = __file__
if fn.endswith('.pyc') or fn.endswith('.pyo'):
path_split = fn.split(os.sep)
if os.altsep is not None:
path_split.extend(fn.split(os.altsep))
if '__pycache__' in path_split:
fn = imp.source_from_cache(fn)
else:
fn = fn[:-1]
zipfp.writepy(fn)
bn = os.path.basename(fn)
self.assertNotIn(bn, zipfp.namelist())
self.assertTrue(bn + 'o' in zipfp.namelist() or
bn + 'c' in zipfp.namelist())
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
fn = __file__
if fn.endswith(('.pyc', '.pyo')):
fn = fn[:-1]
zipfp.writepy(fn, "testpackage")
bn = "%s/%s" % ("testpackage", os.path.basename(fn))
self.assertNotIn(bn, zipfp.namelist())
self.assertTrue(bn + 'o' in zipfp.namelist() or
bn + 'c' in zipfp.namelist())
def test_write_python_package(self):
import email
packagedir = os.path.dirname(email.__file__)
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
zipfp.writepy(packagedir)
# Check for a couple of modules at different levels of the
# hierarchy
names = zipfp.namelist()
self.assertTrue('email/__init__.pyo' in names or
'email/__init__.pyc' in names)
self.assertTrue('email/mime/text.pyo' in names or
'email/mime/text.pyc' in names)
def test_write_with_optimization(self):
import email
packagedir = os.path.dirname(email.__file__)
# use .pyc if running test in optimization mode,
# use .pyo if running test in debug mode
optlevel = 1 if __debug__ else 0
ext = '.pyo' if optlevel == 1 else '.pyc'
try:
with TemporaryFile() as t, \
zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
zipfp.writepy(packagedir)
names = zipfp.namelist()
self.assertIn('email/__init__' + ext, names)
self.assertIn('email/mime/text' + ext, names)
except IOError:
print "\n Cannot execute test_write_with_optimization due to insufficient read permissions for %s" \
% packagedir
def test_write_python_directory(self):
os.mkdir(TESTFN2)
try:
with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
fp.write("print(42)\n")
with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
fp.write("print(42 * 42)\n")
with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
fp.write("bla bla bla\n")
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
zipfp.writepy(TESTFN2)
names = zipfp.namelist()
self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names)
self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names)
self.assertNotIn('mod2.txt', names)
finally:
shutil.rmtree(TESTFN2)
def test_write_non_pyfile(self):
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
with open(TESTFN, 'w') as f:
f.write('most definitely not a python file')
self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
os.remove(TESTFN)
class OtherTests(unittest.TestCase):
zips_with_bad_crc = {
zipfile.ZIP_STORED: (
b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
b'ilehello,AworldP'
b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
b'\0\0/\0\0\0\0\0'),
zipfile.ZIP_DEFLATED: (
b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
}
def test_unicode_filenames(self):
with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr(u"foo.txt", "Test for unicode filename")
zf.writestr(u"\xf6.txt", "Test for unicode filename")
self.assertIsInstance(zf.infolist()[0].filename, str if sys.version_info[0] >= 3 else unicode)
with zipfile.ZipFile(TESTFN, "r") as zf:
self.assertEqual(zf.filelist[0].filename, u"foo.txt")
self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
def test_create_non_existent_file_for_append(self):
if os.path.exists(TESTFN):
os.unlink(TESTFN)
filename = 'testfile.txt'
content = b'hello, world. this is some content.'
try:
with zipfile.ZipFile(TESTFN, 'a') as zf:
zf.writestr(filename, content)
except IOError:
self.fail('Could not append data to a non-existent zip file.')
self.assertTrue(os.path.exists(TESTFN))
with zipfile.ZipFile(TESTFN, 'r') as zf:
self.assertEqual(zf.read(filename), content)
def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object
# it opens if there's an error in the file. If it doesn't, the
# traceback holds a reference to the ZipFile object and, indirectly,
# the file object.
# On Windows, this causes the os.unlink() call to fail because the
# underlying file is still open. This is SF bug #412214.
#
with open(TESTFN, "w") as fp:
fp.write("this is not a legal zip file\n")
try:
zf = zipfile.ZipFile(TESTFN)
except zipfile.BadZipFile:
pass
def test_is_zip_erroneous_file(self):
"""Check that is_zipfile() correctly identifies non-zip files."""
# - passing a filename
with open(TESTFN, "w") as fp:
fp.write("this is not a legal zip file\n")
chk = zipfile.is_zipfile(TESTFN)
self.assertFalse(chk)
# - passing a file object
with open(TESTFN, "rb") as fp:
chk = zipfile.is_zipfile(fp)
self.assertTrue(not chk)
# - passing a file-like object
fp = io.BytesIO()
fp.write(b"this is not a legal zip file\n")
chk = zipfile.is_zipfile(fp)
self.assertTrue(not chk)
fp.seek(0, 0)
chk = zipfile.is_zipfile(fp)
self.assertTrue(not chk)
def test_is_zip_valid_file(self):
"""Check that is_zipfile() correctly identifies zip files."""
# - passing a filename
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
chk = zipfile.is_zipfile(TESTFN)
self.assertTrue(chk)
# - passing a file object
with open(TESTFN, "rb") as fp:
chk = zipfile.is_zipfile(fp)
self.assertTrue(chk)
fp.seek(0, 0)
zip_contents = fp.read()
# - passing a file-like object
fp = io.BytesIO()
fp.write(zip_contents)
chk = zipfile.is_zipfile(fp)
self.assertTrue(chk)
fp.seek(0, 0)
chk = zipfile.is_zipfile(fp)
self.assertTrue(chk)
def test_non_existent_file_raises_IOError(self):
# make sure we don't raise an AttributeError when a partially-constructed
# ZipFile instance is finalized; this tests for regression on SF tracker
# bug #403871.
# The bug we're testing for caused an AttributeError to be raised
# when a ZipFile instance was created for a file that did not
# exist; the .fp member was not initialized but was needed by the
# __del__() method. Since the AttributeError is in the __del__(),
# it is ignored, but the user should be sufficiently annoyed by
# the message on the output that regression will be noticed
# quickly.
self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
def test_empty_file_raises_BadZipFile(self):
f = open(TESTFN, 'w')
f.close()
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
with open(TESTFN, 'w') as fp:
fp.write("short file")
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
def test_closed_zip_raises_RuntimeError(self):
"""Verify that testzip() doesn't swallow inappropriate exceptions."""
data = io.BytesIO()
with zipfile.ZipFile(data, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
# This is correct; calling .read on a closed ZipFile should throw
# a RuntimeError, and so should calling .testzip. An earlier
# version of .testzip would swallow this exception (and any other)
# and report that the first file in the archive was corrupt.
self.assertRaises(RuntimeError, zipf.read, "foo.txt")
self.assertRaises(RuntimeError, zipf.open, "foo.txt")
self.assertRaises(RuntimeError, zipf.testzip)
self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
with open(TESTFN, 'w') as f:
f.write('zipfile test data')
self.assertRaises(RuntimeError, zipf.write, TESTFN)
def test_bad_constructor_mode(self):
"""Check that bad modes passed to ZipFile constructor are caught."""
self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q")
def test_bad_open_mode(self):
"""Check that bad modes passed to ZipFile.open are caught."""
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
with zipfile.ZipFile(TESTFN, mode="r") as zipf:
# read the data to make sure the file is there
zipf.read("foo.txt")
self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
def test_read0(self):
"""Check that calling read(0) on a ZipExtFile object returns an empty
string and doesn't advance file pointer."""
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
# read the data to make sure the file is there
with zipf.open("foo.txt") as f:
for i in range(FIXEDTEST_SIZE):
self.assertEqual(f.read(0), b'')
self.assertEqual(f.read(), b"O, for a Muse of Fire!")
def test_open_non_existent_item(self):
"""Check that attempting to call open() for an item that doesn't
exist in the archive raises a RuntimeError."""
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
def test_bad_compression_mode(self):
"""Check that bad compression methods passed to ZipFile.open are
caught."""
self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1)
def test_null_byte_in_filename(self):
"""Check that a filename containing a null byte is properly
terminated."""
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
self.assertEqual(zipf.namelist(), ['foo.txt'])
def test_struct_sizes(self):
"""Check that ZIP internal structure sizes are calculated correctly."""
self.assertEqual(zipfile.sizeEndCentDir, 22)
self.assertEqual(zipfile.sizeCentralDir, 46)
self.assertEqual(zipfile.sizeEndCentDir64, 56)
self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
def test_comments(self):
"""Check that comments on the archive are handled properly."""
# check default comment is empty
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
self.assertEqual(zipf.comment, b'')
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
self.assertEqual(zipfr.comment, b'')
# check a simple short comment
comment = b'Bravely taking to his feet, he beat a very brave retreat.'
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
self.assertEqual(zipf.comment, comment)
# check a comment of max length
comment2 = ''.join(['%d' % (i ** 3 % 10) for i in range((1 << 16) - 1)])
comment2 = comment2.encode("ascii")
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment2
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
self.assertEqual(zipfr.comment, comment2)
# check a comment that is too long is truncated
with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment2 + b'oops'
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
self.assertEqual(zipfr.comment, comment2)
def check_testzip_with_bad_crc(self, compression):
"""Tests that files with bad CRCs return their name from testzip."""
zipdata = self.zips_with_bad_crc[compression]
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
# testzip returns the name of the first corrupt file, or None
self.assertEqual('afile', zipf.testzip())
def test_testzip_with_bad_crc_stored(self):
self.check_testzip_with_bad_crc(zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_testzip_with_bad_crc_deflated(self):
self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipFile exception when read."""
zipdata = self.zips_with_bad_crc[compression]
# Using ZipFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
# Using ZipExtFile.read()
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
# Same with small reads (in order to exercise the buffering logic)
with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
with zipf.open('afile', 'r') as corrupt_file:
corrupt_file.MIN_READ_SIZE = 2
with self.assertRaises(zipfile.BadZipFile):
while corrupt_file.read(2):
pass
def test_read_with_bad_crc_stored(self):
self.check_read_with_bad_crc(zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_read_with_bad_crc_deflated(self):
self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
def check_read_return_size(self, compression):
# Issue #9837: ZipExtFile.read() shouldn't return more bytes
# than requested.
for test_size in (1, 4095, 4096, 4097, 16384):
file_size = test_size + 1
junk = b''.join(struct.pack('B', randint(0, 255))
for x in range(file_size))
with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf:
zipf.writestr('foo', junk)
with zipf.open('foo', 'r') as fp:
buf = fp.read(test_size)
self.assertEqual(len(buf), test_size)
def test_read_return_size_stored(self):
self.check_read_return_size(zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_read_return_size_deflated(self):
self.check_read_return_size(zipfile.ZIP_DEFLATED)
def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file
zipf = zipfile.ZipFile(TESTFN, mode="w")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except zipfile.BadZipFile:
self.fail("Unable to create empty ZIP file in 'w' mode")
zipf = zipfile.ZipFile(TESTFN, mode="a")
zipf.close()
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
except:
self.fail("Unable to create empty ZIP file in 'a' mode")
def test_open_empty_file(self):
# Issue 1710703: Check that opening a file with less than 22 bytes
# raises a BadZipFile exception (rather than the previously unhelpful
# IOError)
f = open(TESTFN, 'w')
f.close()
self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
def tearDown(self):
unlink(TESTFN)
unlink(TESTFN2)
class DecryptionTests(unittest.TestCase):
"""Check that ZIP decryption works. Since the library does not
support encryption at the moment, we use a pre-generated encrypted
ZIP file."""
data = (
b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
b'\x00\x00L\x00\x00\x00\x00\x00' )
data2 = (
b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
plain = b'zipfile.py encryption test'
plain2 = b'\x00' * 512
def setUp(self):
with open(TESTFN, "wb") as fp:
fp.write(self.data)
self.zip = zipfile.ZipFile(TESTFN, "r")
with open(TESTFN2, "wb") as fp:
fp.write(self.data2)
self.zip2 = zipfile.ZipFile(TESTFN2, "r")
def tearDown(self):
self.zip.close()
os.unlink(TESTFN)
self.zip2.close()
os.unlink(TESTFN2)
def test_no_password(self):
# Reading the encrypted file without password
# must generate a RunTime exception
self.assertRaises(RuntimeError, self.zip.read, "test.txt")
self.assertRaises(RuntimeError, self.zip2.read, "zero")
def test_bad_password(self):
self.zip.setpassword(b"perl")
self.assertRaises(RuntimeError, self.zip.read, "test.txt")
self.zip2.setpassword(b"perl")
self.assertRaises(RuntimeError, self.zip2.read, "zero")
@skipUnless(zlib, "requires zlib")
def test_good_password(self):
self.zip.setpassword(b"python")
self.assertEqual(self.zip.read("test.txt"), self.plain)
self.zip2.setpassword(b"12345")
self.assertEqual(self.zip2.read("zero"), self.plain2)
def test_unicode_password(self):
self.assertRaises(TypeError, self.zip.setpassword, u"unicode")
self.assertRaises(TypeError, self.zip.read, "test.txt", u"python")
self.assertRaises(TypeError, self.zip.open, "test.txt", pwd=u"python")
self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd=u"python")
class TestsWithRandomBinaryFiles(unittest.TestCase):
def setUp(self):
datacount = randint(16, 64) * 1024 + randint(1, 1024)
self.data = b''.join(struct.pack('<f', random() * randint(-1000, 1000))
for i in range(datacount))
# Make a source file with some lines
with open(TESTFN, "wb") as fp:
fp.write(self.data)
def tearDown(self):
unlink(TESTFN)
unlink(TESTFN2)
def make_test_archive(self, f, compression):
# Create the ZIP archive
with zipfile.ZipFile(f, "w", compression) as zipfp:
zipfp.write(TESTFN, "another.name")
zipfp.write(TESTFN, TESTFN)
def zip_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp:
testdata = zipfp.read(TESTFN)
self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data)
self.assertEqual(zipfp.read("another.name"), self.data)
if not isinstance(f, str):
f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_DEFLATED)
def zip_open_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = []
with zipfp.open(TESTFN) as zipopen1:
while True:
read_data = zipopen1.read(256)
if not read_data:
break
zipdata1.append(read_data)
zipdata2 = []
with zipfp.open("another.name") as zipopen2:
while True:
read_data = zipopen2.read(256)
if not read_data:
break
zipdata2.append(read_data)
testdata1 = b''.join(zipdata1)
self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data)
testdata2 = b''.join(zipdata2)
self.assertEqual(len(testdata2), len(self.data))
self.assertEqual(testdata2, self.data)
if not isinstance(f, str):
f.close()
def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_open_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_DEFLATED)
def zip_random_open_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = []
with zipfp.open(TESTFN) as zipopen1:
while True:
read_data = zipopen1.read(randint(1, 1024))
if not read_data:
break
zipdata1.append(read_data)
testdata = b''.join(zipdata1)
self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data)
if not isinstance(f, str):
f.close()
def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_random_open_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
class TestsWithMultipleOpens(unittest.TestCase):
def setUp(self):
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
zipfp.writestr('ones', '1' * FIXEDTEST_SIZE)
zipfp.writestr('twos', '2' * FIXEDTEST_SIZE)
def test_same_file(self):
# Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other.
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
data1 = zopen1.read(500)
data2 = zopen2.read(500)
data1 += zopen1.read(500)
data2 += zopen2.read(500)
self.assertEqual(data1, data2)
def test_different_file(self):
# Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other.
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
data1 = zopen1.read(500)
data2 = zopen2.read(500)
data1 += zopen1.read(500)
data2 += zopen2.read(500)
self.assertEqual(data1, b'1' * FIXEDTEST_SIZE)
self.assertEqual(data2, b'2' * FIXEDTEST_SIZE)
def test_interleaved(self):
# Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other.
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
data1 = zopen1.read(500)
data2 = zopen2.read(500)
data1 += zopen1.read(500)
data2 += zopen2.read(500)
self.assertEqual(data1, b'1' * FIXEDTEST_SIZE)
self.assertEqual(data2, b'2' * FIXEDTEST_SIZE)
def tearDown(self):
unlink(TESTFN2)
class TestWithDirectory(unittest.TestCase):
def setUp(self):
os.mkdir(TESTFN2)
def test_extract_dir(self):
with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
zipf.extractall(TESTFN2)
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
def test_bug_6050(self):
# Extraction should succeed if directories already exist
os.mkdir(os.path.join(TESTFN2, "a"))
self.test_extract_dir()
def test_store_dir(self):
os.mkdir(os.path.join(TESTFN2, "x"))
zipf = zipfile.ZipFile(TESTFN, "w")
zipf.write(os.path.join(TESTFN2, "x"), "x")
self.assertTrue(zipf.filelist[0].filename.endswith("x/"))
def tearDown(self):
shutil.rmtree(TESTFN2)
if os.path.exists(TESTFN):
unlink(TESTFN)
class UniversalNewlineTests(unittest.TestCase):
def setUp(self):
self.line_gen = [bytes("Test of zipfile line %d." % i).encode("ascii")
for i in range(FIXEDTEST_SIZE)]
self.seps = ('\r', '\r\n', '\n')
self.arcdata, self.arcfiles = {}, {}
for n, s in enumerate(self.seps):
b = s.encode("ascii")
self.arcdata[s] = b.join(self.line_gen) + b
self.arcfiles[s] = '%s-%d' % (TESTFN, n)
f = open(self.arcfiles[s], "wb")
try:
f.write(self.arcdata[s])
finally:
f.close()
def make_test_archive(self, f, compression):
# Create the ZIP archive
with zipfile.ZipFile(f, "w", compression) as zipfp:
for fn in self.arcfiles.values():
zipfp.write(fn, fn)
def read_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
with zipfp.open(fn, "rU") as fp:
zipdata = fp.read()
self.assertEqual(self.arcdata[sep], zipdata)
if not isinstance(f, str):
f.close()
def readline_read_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
with zipfp.open(fn, "rU") as zipopen:
data = b''
while True:
read = zipopen.readline()
if not read:
break
data += read
read = zipopen.read(5)
if not read:
break
data += read
self.assertEqual(data, self.arcdata['\n'])
if not isinstance(f, str):
f.close()
def readline_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
with zipfp.open(fn, "rU") as zipopen:
for line in self.line_gen:
linedata = zipopen.readline()
self.assertEqual(linedata, line + b'\n')
if not isinstance(f, str):
f.close()
def readlines_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
with zipfp.open(fn, "rU") as fp:
ziplines = fp.readlines()
for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + b'\n')
if not isinstance(f, str):
f.close()
def iterlines_test(self, f, compression):
self.make_test_archive(f, compression)
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
with zipfp.open(fn, "rU") as fp:
for line, zipline in zip(self.line_gen, fp):
self.assertEqual(zipline, line + b'\n')
if not isinstance(f, str):
f.close()
def test_read_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.read_test(f, zipfile.ZIP_STORED)
def test_readline_read_stored(self):
# Issue #7610: calls to readline() interleaved with calls to read().
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_read_test(f, zipfile.ZIP_STORED)
def test_readline_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_test(f, zipfile.ZIP_STORED)
def test_readlines_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlines_test(f, zipfile.ZIP_STORED)
def test_iterlines_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlines_test(f, zipfile.ZIP_STORED)
@skipUnless(zlib, "requires zlib")
def test_read_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.read_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_readline_read_deflated(self):
# Issue #7610: calls to readline() interleaved with calls to read().
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_read_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_readline_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_readlines_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlines_test(f, zipfile.ZIP_DEFLATED)
@skipUnless(zlib, "requires zlib")
def test_iterlines_deflated(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlines_test(f, zipfile.ZIP_DEFLATED)
def tearDown(self):
for sep, fn in self.arcfiles.items():
os.remove(fn)
unlink(TESTFN)
unlink(TESTFN2)
class RemoveTests(unittest.TestCase):
def test_simple(self):
fname = "foo.txt"
# remove with fname
with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr(fname, "just add a file with a name and some data")
self.assertEqual(zf.infolist()[0].filename, fname)
with zipfile.ZipFile(TESTFN, "a") as zf:
zf.remove(fname)
self.assertEqual(len(zf.infolist()), 0)
# remove with zipinfo
with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr(fname, "just add a file with a name and some data")
self.assertEqual(zf.infolist()[0].filename, fname)
with zipfile.ZipFile(TESTFN, "a") as zf:
zinfo = zf.getinfo(fname)
zf.remove(zinfo)
self.assertEqual(len(zf.infolist()), 0)
def write_three_and_remove_one(self, index):
data_list = [bytes([randint(0, 255) for i in range(10)]) for i in range(3)]
fname_list = ["firstfile", "secondfile", "thirdfile"]
with zipfile.ZipFile(TESTFN, "w") as zf:
for fname, data in zip(fname_list, data_list):
zf.writestr(fname, data)
with zipfile.ZipFile(TESTFN, "a") as zf:
zf.remove(fname_list[index])
i_to_check = list(range(len(data_list)))
i_to_check.remove(index)
with zipfile.ZipFile(TESTFN, "r") as zf:
for i in i_to_check:
# the last reads fail if the pointers weren't corrected
self.assertEqual(zf.read(fname_list[i]), data_list[i])
def test_remove_middle(self):
self.write_three_and_remove_one(0)
self.write_three_and_remove_one(1)
self.write_three_and_remove_one(2)
def test_with_data_descriptor(self):
fname = "foo.txt"
data = "just add a file with a name and some data"
with zipfile.ZipFile(TESTFN, "w") as zf:
zinfo = zipfile.ZipInfo(fname)
zinfo.flag_bits = zinfo.flag_bits | zipfile._FHF_HAS_DATA_DESCRIPTOR
zf.writestr(zinfo, data)
with zipfile.ZipFile(TESTFN, "r") as zf:
zinfo = zf.getinfo(fname)
data_desc_size = zf._get_data_descriptor_size(zinfo)
zlen = len(zinfo.FileHeader()) + zinfo.compress_size + len(zf._central_dir_header(zinfo)) + data_desc_size
size = os.path.getsize(TESTFN)
with zipfile.ZipFile(TESTFN, "a") as zf:
zf.remove(fname)
new_size = os.path.getsize(TESTFN)
size_delta = size - new_size
self.assertEqual(size_delta, zlen)
def test_shrinks(self):
fname = "foo.txt"
data = "just add a file with a name and some data"
with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr(fname, data)
zinfo = zf.getinfo(fname)
# ignoring descriptor size because ZipFile by default doesn't use it
zlen = len(zinfo.FileHeader()) + zinfo.compress_size + len(zf._central_dir_header(zinfo))
size = os.path.getsize(TESTFN)
with zipfile.ZipFile(TESTFN, "a") as zf:
zf.remove(fname)
new_size = os.path.getsize(TESTFN)
# size was 22 bytes vs 153 bytes on my machine btw
self.assertEqual(new_size, size - zlen)
def test_verifies_requirements(self):
fname = "foo.txt"
# test no remove on closed zipfile
with self.assertRaises(RuntimeError):
zf = zipfile.ZipFile(TESTFN, "w")
zf.writestr(fname, "just add a file with a name and some data")
zf.close()
zf.remove(fname)
# test no remove without "a"
with self.assertRaises(RuntimeError):
with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr(fname, "just add a file with a name and some data")
zf.remove(fname)
def test_delete_add(self):
fname_list = ["foo.txt", "bar.txt", "blubb.bla", "sup.bro", "rock'n'roll"]
data_list = [''.join([chr(randint(0, 255)) for i in range(100)]) for i in range(len(fname_list))]
# add some files to the zip
with zipfile.ZipFile(TESTFN, "w") as zf:
for fname, data in zip(fname_list, data_list):
zf.writestr(fname, data)
for no in range(0, 3):
# remove the middle file and add it again
with zipfile.ZipFile(TESTFN, "a") as zf:
zf.remove(fname_list[no])
zf.writestr(fname_list[no], data_list[no])
# try to access prior deleted/added file and prior last file (which got moved, while delete)
with zipfile.ZipFile(TESTFN, "a") as zf:
for fname, data in zip(fname_list, data_list):
self.assertEqual(zf.read(fname), data)
def test_delete_add_no_close(self):
fname_list = ["foo.txt", "bar.txt", "blu.bla", "sup.bro", "rollah"]
data_list = [''.join([chr(randint(0, 255)) for i in range(randint(50, 150))]) for i in range(len(fname_list))]
# add some files to the zip
with zipfile.ZipFile(TESTFN, "w") as zf:
for fname, data in zip(fname_list, data_list):
zf.writestr(fname, data)
for no in range(0, 2):
with zipfile.ZipFile(TESTFN, "a") as zf:
# remove the middle file and add it again
zf.remove(fname_list[no])
zf.writestr(fname_list[no], data_list[no])
zf.remove(fname_list[no+1])
zf.writestr(fname_list[no+1], data_list[no+1])
# try to access prior deleted/added file and prior last file (which got moved, while delete)
for fname, data in zip(fname_list, data_list):
self.assertEqual(zf.read(fname), data)
def tearDown(self):
unlink(TESTFN)
def test_main():
run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
TestWithDirectory, UniversalNewlineTests,
TestsWithRandomBinaryFiles, RemoveTests)
if __name__ == "__main__":
test_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment