Skip to content

Instantly share code, notes, and snippets.

@pdjstone
Last active December 18, 2023 12:42
Show Gist options
  • Save pdjstone/29b7ea3455d05e637573c0f3c1bdffdf to your computer and use it in GitHub Desktop.
Save pdjstone/29b7ea3455d05e637573c0f3c1bdffdf to your computer and use it in GitHub Desktop.
Use rapidgzip to extract large ZIP entries by wrapping the deflate stream in a file-like object that provides the GZIP header and footer
import shutil
import sys
from io import BytesIO
from zipfile import ZipFile, ZIP_DEFLATED, ZipExtFile
import rapidgzip
class FileBase():
def __init__(self):
self._pos = 0
self._seekable = True
self._closed = False
def seek(self, offset, whence=0) -> int:
if whence == 0:
self._pos = 0
elif whence == 1:
pass
elif whence == 2:
self._pos = self.size
self._pos += offset
if self._pos < 0: self._pos = 0
return self._pos
def read(self, amt=-1) -> bytes:
if self._pos >= self.size:
return b""
if amt < 0:
end = self.size
else:
end = min(self._pos + amt, self.size)
data = self.read_range(self._pos, end)
self._pos = end
return data
def read_range(self, start ,end) -> bytes:
raise NotImplementedError()
def tell(self) -> int:
return self._pos
def seekable(self):
return True
def readable(self):
return not self.closed
def writable(self):
return False
def write(self):
raise NotImplementedError()
def close(self):
if self._closed:
return
self._closed = True
class FileSegment(FileBase):
def __init__(self, parent_fd, offset, size):
super().__init__()
self.parent_fd = parent_fd
self.offset = offset
self.size = size
def read_range(self, start, end):
self.parent_fd.seek(self.offset + start)
return self.parent_fd.read(end-start)
class GzipDeflateWrapper(FileBase):
def __init__(self, deflate_fd, compressed_size, decompressed_size, crc32):
super().__init__()
self.size = compressed_size + 18
gzip_header = BytesIO(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03')
gzip_footer = BytesIO(crc32.to_bytes(4, 'little') + (decompressed_size & 0xffffffff).to_bytes(4, 'little'))
self._files = (
# fd, start_offset, end_offset
(gzip_header, 0, 10),
(deflate_fd, 10, compressed_size + 10),
(gzip_footer, compressed_size + 10, compressed_size + 18)
)
def read_range(self, start, end) -> bytes:
data = b''
for f, pos, amt in self._get_read_ranges(start, end):
f.seek(pos)
data += f.read(amt)
return data
def _get_read_ranges(self, offset, end_offset):
for fd, file_start, file_end in self._files:
if file_start <= offset < file_end:
read_amt = min(file_end-offset, end_offset-offset)
yield fd, offset-file_start, read_amt
offset += read_amt
if offset == end_offset:
break
class RapidZip(ZipFile):
def __init__(self, file):
super().__init__(file, 'r')
def open(self, item):
if isinstance(item, str):
item = super().getinfo(item)
zef : ZipExtFile = super().open(item)
if item.compress_type == ZIP_DEFLATED:
deflate_fd = FileSegment(self.fp, zef._orig_compress_start, zef._orig_compress_size)
gzip_fd = GzipDeflateWrapper(deflate_fd, item.compress_size, item.file_size, item.CRC)
return rapidgzip.open(gzip_fd, verbose=True)
print("Warning - not a deflate stream")
return zef
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f'Usage: {sys.argv[0]} zipfile itemname [outfile]')
sys.exit(-1)
zip = RapidZip(sys.argv[1])
if len(sys.argv) < 3:
for n in zip.namelist():
print(n)
print('Please specify a file to extract')
sys.exit(-1)
out_name = item_name = sys.argv[2]
if len(sys.argv) >= 4:
out_name = sys.argv[3]
with open(out_name, 'wb') as out_fd:
with zip.open(item_name) as item_fd:
shutil.copyfileobj(item_fd, out_fd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment