Created
June 15, 2020 13:45
-
-
Save alaniwi/a607e46f0fa5044f2c5fef532c205b0c to your computer and use it in GitHub Desktop.
modified ZipFile.write method
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def write(self, filename, arcname=None, compress_type=None, | |
fp=None, mtime=None, size=None, isdir=None, | |
mode=None): | |
"""Put the bytes from filename into the archive under the name | |
arcname.""" | |
if not self.fp: | |
raise RuntimeError( | |
"Attempt to write to ZIP archive that was already closed") | |
if isdir: | |
assert(mtime != None and size in (0, None) and fp == None) | |
size = 0 | |
elif fp: | |
assert(mtime != None and size != None and isdir == None) | |
isdir = False | |
else: | |
assert(mode == None) | |
st = os.stat(filename) | |
isdir = stat.S_ISDIR(st.st_mode) | |
mtime = time.localtime(st.st_mtime) | |
date_time = mtime[0:6] | |
# Create ZipInfo instance to store file information | |
if arcname is None: | |
arcname = filename | |
arcname = os.path.normpath(os.path.splitdrive(arcname)[1]) | |
while arcname[0] in (os.sep, os.altsep): | |
arcname = arcname[1:] | |
if isdir: | |
arcname += '/' | |
zinfo = ZipInfo(arcname, date_time) | |
zinfo.external_attr = (mode if mode != None else st[0] & 0xFFFF) << 16 # Unix attributes | |
if isdir: | |
zinfo.compress_type = ZIP_STORED | |
elif compress_type is None: | |
zinfo.compress_type = self.compression | |
else: | |
zinfo.compress_type = compress_type | |
zinfo.file_size = size if size != None else st.st_size | |
zinfo.flag_bits = 0x00 | |
zinfo.header_offset = self.fp.tell() # Start of header bytes | |
if zinfo.compress_type == ZIP_LZMA: | |
# Compressed data includes an end-of-stream (EOS) marker | |
zinfo.flag_bits |= 0x02 | |
self._writecheck(zinfo) | |
self._didModify = True | |
if isdir: | |
zinfo.file_size = 0 | |
zinfo.compress_size = 0 | |
zinfo.CRC = 0 | |
zinfo.external_attr |= 0x10 # MS-DOS directory flag | |
self.filelist.append(zinfo) | |
self.NameToInfo[zinfo.filename] = zinfo | |
self.fp.write(zinfo.FileHeader(False)) | |
return | |
cmpr = _get_compressor(zinfo.compress_type) | |
with fp or open(filename, "rb") as fp: | |
# Must overwrite CRC and sizes with correct data later | |
zinfo.CRC = CRC = 0 | |
zinfo.compress_size = compress_size = 0 | |
# Compressed size can be larger than uncompressed size | |
zip64 = self._allowZip64 and \ | |
zinfo.file_size * 1.05 > ZIP64_LIMIT | |
self.fp.write(zinfo.FileHeader(zip64)) | |
file_size = 0 | |
while 1: | |
buf = fp.read(1024 * 8) | |
if not buf: | |
break | |
file_size = file_size + len(buf) | |
CRC = crc32(buf, CRC) & 0xffffffff | |
if cmpr: | |
buf = cmpr.compress(buf) | |
compress_size = compress_size + len(buf) | |
self.fp.write(buf) | |
if cmpr: | |
buf = cmpr.flush() | |
compress_size = compress_size + len(buf) | |
self.fp.write(buf) | |
zinfo.compress_size = compress_size | |
else: | |
zinfo.compress_size = file_size | |
zinfo.CRC = CRC | |
zinfo.file_size = file_size | |
if not zip64 and self._allowZip64: | |
if file_size > ZIP64_LIMIT: | |
raise RuntimeError('File size has increased during compressing') | |
if compress_size > ZIP64_LIMIT: | |
raise RuntimeError('Compressed size larger than uncompressed size') | |
# Seek backwards and write file header (which will now include | |
# correct CRC and file sizes) | |
position = self.fp.tell() # Preserve current position in file | |
self.fp.seek(zinfo.header_offset, 0) | |
self.fp.write(zinfo.FileHeader(zip64)) | |
self.fp.seek(position, 0) | |
self.filelist.append(zinfo) | |
self.NameToInfo[zinfo.filename] = zinfo |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment