Skip to content

Instantly share code, notes, and snippets.

@evansd
Last active May 22, 2024 13:01
Show Gist options
  • Save evansd/e4adb3063de4418512b3f5e5d80599ea to your computer and use it in GitHub Desktop.
Save evansd/e4adb3063de4418512b3f5e5d80599ea to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Stream the first file out of a ZIP file supplied as a stream of bytes.
This is a violation of the ZIP spec in that the canonical contents of a ZIP file are
given in the "Central Directory" which only appears at the end of the file. In theory,
ZIP files can have any arbitrary junk prepended to them and still be valid.
In practice, for the ZIP files we deal with, the file we want to extract is always the
very first thing in the file and this allows us to decompress the contents in a
streaming fashion as we would with, e.g, a gzipped file.
"""
import struct
import zipfile
def file_from_zip_stream(stream):
# Below based on the header-reading code from the `zipfile` module:
# https://github.com/python/cpython/blob/f2016280/Lib/zipfile/__init__.py#L1649-L1699
# Read and parse the header
header_bytes = stream.read(zipfile.sizeFileHeader)
header = struct.unpack(zipfile.structFileHeader, header_bytes)
if header[zipfile._FH_SIGNATURE] != zipfile.stringFileHeader:
raise zipfile.BadZipFile("Bad magic number for file header")
# Construct a ZipInfo object based on the contents of the header
zinfo = zipfile.ZipInfo()
zinfo.compress_type = header[zipfile._FH_COMPRESSION_METHOD]
zinfo.compress_size = header[zipfile._FH_COMPRESSED_SIZE]
zinfo.file_size = header[zipfile._FH_UNCOMPRESSED_SIZE]
zinfo.flag_bits = header[zipfile._FH_GENERAL_PURPOSE_FLAG_BITS]
# Check for unsupported features
if zinfo.flag_bits & zipfile._MASK_COMPRESSED_PATCH:
raise zipfile.BadZipFile("Unsupported feature: compressed patched data")
if zinfo.flag_bits & zipfile._MASK_ENCRYPTED:
raise zipfile.BadZipFile("Unsupported feature: encryption")
# Skip any additional metadata after the header
extra_metadata_size = (
header[zipfile._FH_FILENAME_LENGTH] + header[zipfile._FH_EXTRA_FIELD_LENGTH]
)
stream.read(extra_metadata_size)
return zipfile.ZipExtFile(stream, "rb", zinfo)
if __name__ == "__main__":
import argparse
import shutil
import sys
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--chunk-size", type=int, default=1024 * 1024)
args = parser.parse_args()
file_stream = file_from_zip_stream(sys.stdin.buffer)
shutil.copyfileobj(file_stream, sys.stdout.buffer, length=args.chunk_size)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment