Last active
December 18, 2023 12:42
-
-
Save pdjstone/29b7ea3455d05e637573c0f3c1bdffdf to your computer and use it in GitHub Desktop.
Use rapidgzip to extract large ZIP entries by wrapping the deflate stream in a file-like object that provides the GZIP header and footer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shutil | |
import sys | |
from io import BytesIO | |
from zipfile import ZipFile, ZIP_DEFLATED, ZipExtFile | |
import rapidgzip | |
class FileBase(): | |
def __init__(self): | |
self._pos = 0 | |
self._seekable = True | |
self._closed = False | |
def seek(self, offset, whence=0) -> int: | |
if whence == 0: | |
self._pos = 0 | |
elif whence == 1: | |
pass | |
elif whence == 2: | |
self._pos = self.size | |
self._pos += offset | |
if self._pos < 0: self._pos = 0 | |
return self._pos | |
def read(self, amt=-1) -> bytes: | |
if self._pos >= self.size: | |
return b"" | |
if amt < 0: | |
end = self.size | |
else: | |
end = min(self._pos + amt, self.size) | |
data = self.read_range(self._pos, end) | |
self._pos = end | |
return data | |
def read_range(self, start ,end) -> bytes: | |
raise NotImplementedError() | |
def tell(self) -> int: | |
return self._pos | |
def seekable(self): | |
return True | |
def readable(self): | |
return not self.closed | |
def writable(self): | |
return False | |
def write(self): | |
raise NotImplementedError() | |
def close(self): | |
if self._closed: | |
return | |
self._closed = True | |
class FileSegment(FileBase): | |
def __init__(self, parent_fd, offset, size): | |
super().__init__() | |
self.parent_fd = parent_fd | |
self.offset = offset | |
self.size = size | |
def read_range(self, start, end): | |
self.parent_fd.seek(self.offset + start) | |
return self.parent_fd.read(end-start) | |
class GzipDeflateWrapper(FileBase): | |
def __init__(self, deflate_fd, compressed_size, decompressed_size, crc32): | |
super().__init__() | |
self.size = compressed_size + 18 | |
gzip_header = BytesIO(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03') | |
gzip_footer = BytesIO(crc32.to_bytes(4, 'little') + (decompressed_size & 0xffffffff).to_bytes(4, 'little')) | |
self._files = ( | |
# fd, start_offset, end_offset | |
(gzip_header, 0, 10), | |
(deflate_fd, 10, compressed_size + 10), | |
(gzip_footer, compressed_size + 10, compressed_size + 18) | |
) | |
def read_range(self, start, end) -> bytes: | |
data = b'' | |
for f, pos, amt in self._get_read_ranges(start, end): | |
f.seek(pos) | |
data += f.read(amt) | |
return data | |
def _get_read_ranges(self, offset, end_offset): | |
for fd, file_start, file_end in self._files: | |
if file_start <= offset < file_end: | |
read_amt = min(file_end-offset, end_offset-offset) | |
yield fd, offset-file_start, read_amt | |
offset += read_amt | |
if offset == end_offset: | |
break | |
class RapidZip(ZipFile): | |
def __init__(self, file): | |
super().__init__(file, 'r') | |
def open(self, item): | |
if isinstance(item, str): | |
item = super().getinfo(item) | |
zef : ZipExtFile = super().open(item) | |
if item.compress_type == ZIP_DEFLATED: | |
deflate_fd = FileSegment(self.fp, zef._orig_compress_start, zef._orig_compress_size) | |
gzip_fd = GzipDeflateWrapper(deflate_fd, item.compress_size, item.file_size, item.CRC) | |
return rapidgzip.open(gzip_fd, verbose=True) | |
print("Warning - not a deflate stream") | |
return zef | |
if __name__ == '__main__': | |
if len(sys.argv) < 2: | |
print(f'Usage: {sys.argv[0]} zipfile itemname [outfile]') | |
sys.exit(-1) | |
zip = RapidZip(sys.argv[1]) | |
if len(sys.argv) < 3: | |
for n in zip.namelist(): | |
print(n) | |
print('Please specify a file to extract') | |
sys.exit(-1) | |
out_name = item_name = sys.argv[2] | |
if len(sys.argv) >= 4: | |
out_name = sys.argv[3] | |
with open(out_name, 'wb') as out_fd: | |
with zip.open(item_name) as item_fd: | |
shutil.copyfileobj(item_fd, out_fd) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment