Skip to content

Instantly share code, notes, and snippets.

@anadius
Created November 7, 2021 00:14
Show Gist options
  • Save anadius/c28e757b7626b45c15f0b31b5597d488 to your computer and use it in GitHub Desktop.
Save anadius/c28e757b7626b45c15f0b31b5597d488 to your computer and use it in GitHub Desktop.
Improved `zipfile` module
"""
This module is an improved `zipfile` module. Changes:
* fixed function for stripping the `extra` field of file header
+ added `compresslevel` support for LZMA compression
possible values: integer from 0 to 9 or a filter dict, like `{"dict_size": 67108864}`
https://docs.python.org/3/library/lzma.html#specifying-custom-filter-chains
+ when `ZipFile.store_if_smaller` is set to True and the compressed size is bigger than
the uncompressed size - scrap the compressed data and store file without compression;
+ added functions for writing/reading raw bytes or Python objects (anything that's
JSON serializable) to `extra` fields of file headers (up to 64KiB per file/folder)
"""
import importlib.util
import struct
import lzma
import json
import io
__all__ = [
"ExtraTooLong",
"BadZipFile",
"ZIP_STORED",
"ZIP_DEFLATED",
"ZIP_BZIP2",
"ZIP_LZMA",
"is_zipfile",
"ZipInfo",
"ZipFile",
"LargeZipFile",
]
"""
Import `zipfile` as `_z` and then monkey patch it while leaving `zipfile` intact
https://stackoverflow.com/a/11285504/2428152
"""
SPEC_ZIPFILE = importlib.util.find_spec("zipfile")
_z = importlib.util.module_from_spec(SPEC_ZIPFILE)
SPEC_ZIPFILE.loader.exec_module(_z)
del SPEC_ZIPFILE
BadZipFile = _z.BadZipFile
ZIP_STORED = _z.ZIP_STORED
ZIP_DEFLATED = _z.ZIP_DEFLATED
ZIP_BZIP2 = _z.ZIP_BZIP2
ZIP_LZMA = _z.ZIP_LZMA
is_zipfile = _z.is_zipfile
ZipInfo = _z.ZipInfo
LargeZipFile = _z.LargeZipFile
"""
Monkey patch broken function
https://bugs.python.org/issue44067
"""
def _strip_extra(extra, xids):
# Remove Extra Fields with specified IDs.
unpack = _z._EXTRA_FIELD_STRUCT.unpack
modified = False
buffer = []
i = 0
while i + 4 <= len(extra):
xid, xlen = unpack(extra[i : i + 4])
j = i + 4 + xlen
if xid in xids:
modified = True
else:
buffer.append(extra[i:j])
i = j
if not modified:
return extra
return b"".join(buffer)
_z._strip_extra = _strip_extra
"""
Let LZMA compression use `compresslevel` - it can be integer from 0 to 9
or a dict with params specified at
https://docs.python.org/3/library/lzma.html#specifying-custom-filter-chains
"""
class _LZMACompressor(_z.LZMACompressor):
def __init__(self, compresslevel):
if isinstance(compresslevel, dict):
self._props = compresslevel
elif isinstance(compresslevel, int):
self._props = {"preset": compresslevel}
else:
self._props = {}
super().__init__()
def _init(self):
self._props.update({"id": lzma.FILTER_LZMA1})
props = lzma._encode_filter_properties(self._props)
self._comp = lzma.LZMACompressor(
lzma.FORMAT_RAW,
filters=[lzma._decode_filter_properties(lzma.FILTER_LZMA1, props)],
)
return struct.pack("<BBH", 9, 4, len(props)) + props
_get_compressor_original = _z._get_compressor
def _get_compressor(compress_type, compresslevel=None):
if compress_type == ZIP_LZMA:
return _LZMACompressor(compresslevel)
else:
return _get_compressor_original(compress_type, compresslevel)
_z._get_compressor = _get_compressor
(_EXTRA_BYTES_HEADER_ID,) = struct.unpack("<H", b"an") # 28257
(_EXTRA_MAX_SIZE,) = struct.unpack("<H", b"\xFF\xFF") # 65535
_ZIP64_MAX_EXTRA_SIZE = struct.calcsize("<HHQQQ") # 28
class ExtraTooLong(Exception):
"""
Raised when extra bytes are longer than the space available. Each file
header can store up to 64KiB-1, more files in archive means more space.
"""
def _get_extra_data(extra, extra_id):
unpack = _z._EXTRA_FIELD_STRUCT.unpack
i = 0
while i + 4 <= len(extra):
xid, xlen = unpack(extra[i : i + 4])
j = i + 4 + xlen
if xid == extra_id:
return extra[i + 4 : j]
i = j
return None
class ZipFile(_z.ZipFile):
def __init__(self, *args, store_if_smaller=False, **kwargs):
self.store_if_smaller = store_if_smaller
super().__init__(*args, **kwargs)
def write(self, *args, **kwargs):
super().write(*args, **kwargs)
zinfo = self.filelist[-1]
if self.store_if_smaller and zinfo.compress_size > zinfo.file_size:
self.start_dir = zinfo.header_offset
self.fp.seek(zinfo.header_offset)
self.fp.truncate()
del self.NameToInfo[zinfo.filename]
self.filelist.remove(zinfo)
kwargs["compress_type"] = ZIP_STORED
super().write(*args, **kwargs)
def write_extra_bytes(self, extra_bytes, header_id=_EXTRA_BYTES_HEADER_ID):
"""
Store `extra_bytes` in `extra` fileds of file headers in central directory
"""
for zip_info in self.filelist:
# Strip old data
zip_info.extra = _strip_extra(zip_info.extra, (header_id,))
for zip_info in self.filelist:
if len(extra_bytes) == 0:
break
"""
Calculate free space while leaving enough for max size of
ZIP64 header and 4 bytes for our own header
"""
free_space = (
_EXTRA_MAX_SIZE
- _ZIP64_MAX_EXTRA_SIZE
- len(_strip_extra(zip_info.extra, (1,)))
- _z._EXTRA_FIELD_STRUCT.size
)
chunk = extra_bytes[:free_space]
chunk_size = len(chunk)
zip_info.extra += _z._EXTRA_FIELD_STRUCT.pack(header_id, chunk_size) + chunk
extra_bytes = extra_bytes[chunk_size:]
if len(extra_bytes) != 0:
raise ExtraTooLong("Extra bytes too long")
self._didModify = True
"""
`zipfile` rewrites the central directory but doesn't truncate the file
leading to corrupted ZIP if new extra bytes are shorter than the old ones
"""
self.fp.seek(self.start_dir)
self.fp.truncate()
def write_metadata(self, obj, header_id=_EXTRA_BYTES_HEADER_ID):
"""
Serialize `obj` to a JSON formatted str, compress it with `lzma` and write
resulting bytes with `write_extra_bytes`
"""
compressed = io.BytesIO()
with lzma.open(compressed, "wt") as lz:
lz.write(json.dumps(obj))
self.write_extra_bytes(compressed.getvalue(), header_id)
def read_extra_bytes(self, header_id=_EXTRA_BYTES_HEADER_ID):
"""
Read bytes written with `write_extra_bytes`
"""
extra_bytes = []
for zip_info in self.filelist:
extra = _get_extra_data(zip_info.extra, header_id)
if extra is None:
break
extra_bytes.append(extra)
return b"".join(extra_bytes)
def read_metadata(self, header_id=_EXTRA_BYTES_HEADER_ID):
"""
Read object written with `write_metadata`
"""
compressed = io.BytesIO(self.read_extra_bytes(header_id))
compressed.seek(0)
try:
with lzma.open(compressed, "rt") as lz:
return json.loads(lz.read())
except (EOFError, lzma.LZMAError, json.JSONDecodeError):
raise ValueError("Could not read the metadata")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment