Skip to content

Instantly share code, notes, and snippets.

@fletom
Created October 21, 2014 18:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fletom/8162d6ebcaf3ece98e92 to your computer and use it in GitHub Desktop.
Save fletom/8162d6ebcaf3ece98e92 to your computer and use it in GitHub Desktop.
Dynamically create a zip archive in Python with iterable input and iterable output.
#!/usr/bin/env python
"""
Iterable ZIP archive generator.
Modified from https://github.com/SpiderOak/ZipStream.
"""
import struct, os, sys
import binascii
import time
try:
import zlib # We may need its compression method
except ImportError:
zlib = None
__all__ = ["ZIP_STORED", "ZIP_DEFLATED", "ZipStream"]
ZIP64_LIMIT= (1 << 31) - 1
# constants for Zip file compression methods
ZIP_STORED = 0
ZIP_DEFLATED = 8
# Other ZIP compression methods not supported
# Here are some struct module formats for reading headers
STRUCT_END_ARCHIVE = "<4s4H2lH" # 9 items, end of archive, 22 bytes
STRING_END_ARCHIVE = "PK\005\006" # magic number for end of archive record
STRUCT_CENTRAL_DIR = "<4s4B4HlLL5HLl"# 19 items, central directory, 46 bytes
STRING_CENTRAL_DIR = "PK\001\002" # magic number for central directory
STRUCT_FILE_HEADER = "<4s2B4HlLL2H" # 12 items, file header record, 30 bytes
STRING_FILE_HEADER = "PK\003\004" # magic number for file header
STRUCT_END_ARCHIVE_64_LOCATOR = "<4slql" # 4 items, locate Zip64 header, 20 bytes
STRING_END_ARCHIVE_64_LOCATOR = "PK\x06\x07" # magic token for locator header
STRUCT_END_ARCHIVE_64 = "<4sqhhllqqqq" # 10 items, end of archive (Zip64), 56 bytes
STRING_END_ARCHIVE_64 = "PK\x06\x06" # magic token for Zip64 header
STRING_DATA_DESCRIPTOR = "PK\x07\x08" # magic number for data descriptor
# indexes of entries in the central directory structure
_CD_SIGNATURE = 0
_CD_CREATE_VERSION = 1
_CD_CREATE_SYSTEM = 2
_CD_EXTRACT_VERSION = 3
_CD_EXTRACT_SYSTEM = 4 # is this meaningful?
_CD_FLAG_BITS = 5
_CD_COMPRESS_TYPE = 6
_CD_TIME = 7
_CD_DATE = 8
_CD_CRC = 9
_CD_COMPRESSED_SIZE = 10
_CD_UNCOMPRESSED_SIZE = 11
_CD_FILENAME_LENGTH = 12
_CD_EXTRA_FIELD_LENGTH = 13
_CD_COMMENT_LENGTH = 14
_CD_DISK_NUMBER_START = 15
_CD_INTERNAL_FILE_ATTRIBUTES = 16
_CD_EXTERNAL_FILE_ATTRIBUTES = 17
_CD_LOCAL_HEADER_OFFSET = 18
# indexes of entries in the local file header structure
_FH_SIGNATURE = 0
_FH_EXTRACT_VERSION = 1
_FH_EXTRACT_SYSTEM = 2 # is this meaningful?
_FH_GENERAL_PURPOSE_FLAG_BITS = 3
_FH_COMPRESSION_METHOD = 4
_FH_LAST_MOD_TIME = 5
_FH_LAST_MOD_DATE = 6
_FH_CRC = 7
_FH_COMPRESSED_SIZE = 8
_FH_UNCOMPRESSED_SIZE = 9
_FH_FILENAME_LENGTH = 10
_FH_EXTRA_FIELD_LENGTH = 11
# What should these actually be?
DEFAULT_ST_MODE = 33188 # oct(33188) == '0100644'
# Use the current time when we don't know the files' actual modification times.
# Unfortunately, ZIP dates and times are stored in local time, not UTC.
# This means that the files will extract differently depending on the timezone of the user.
DEFAULT_MTIME = time.localtime()
class ZipInfo(object):
"""Class with attributes describing each file in the ZIP archive."""
create_version = 20
extract_version = 20
comment = "" # Comment for each file
extra = "" # ZIP extra data
reserved = 0 # Must be zero
flag_bits = 0x08 # ZIP flag bits, bit 3 indicates presence of data descriptor
volume = 0 # Volume number of file header
internal_attr = 0 # Internal attributes
__slots__ = (
'file_name',
'mtime',
'compression',
'create_system',
'external_attr',
'header_offset',
'CRC',
'compress_size',
'file_size',
)
def __init__(self, file_name, external_attr, compression, header_offset, mtime):
self.header_offset = header_offset # Byte offset to the file header
self.external_attr = external_attr # External file attributes
self.compression = compression # ZIP_STORED or ZIP_DEFLATED
# Terminate the file name at the first null byte. Null bytes in file
# names are used as tricks by viruses in archives.
if '\0' in file_name:
raise ValueError("Filenames cannot contain the null byte.")
# This is used to ensure paths in generated ZIP files always use
# forward slashes as the directory separator, as required by the
# ZIP format specification.
if os.sep != "/" and os.sep in file_name:
file_name = file_name.replace(os.sep, "/")
self.file_name = file_name # Normalized file name
self.mtime = mtime # year, month, day, hour, min, sec
if sys.platform == 'win32':
self.create_system = 0 # System which created ZIP archive
else:
# Assume everything else is unix-y
self.create_system = 3 # System which created ZIP archive
## The following are set by the ZipStream class.
# CRC-32 of the uncompressed file
self.CRC = 0
# Size of the compressed file
self.compress_size = 0
# Size of the uncompressed file
self.file_size = 0
def data_descriptor(self):
if self.compress_size > ZIP64_LIMIT or self.file_size > ZIP64_LIMIT:
fmt = "<4slQQ"
else:
fmt = "<4slLL"
return struct.pack(fmt, STRING_DATA_DESCRIPTOR, self.CRC, self.compress_size, self.file_size)
@property
def dosdate(self):
dt = self.mtime
return (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
@property
def dostime(self):
dt = self.mtime
return dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
def file_header(self):
"""Return the per-file header as a string."""
# Set these to zero because we write them after the file data
CRC = compress_size = file_size = 0
header = struct.pack(
STRUCT_FILE_HEADER,
STRING_FILE_HEADER,
self.extract_version,
self.reserved,
self.flag_bits,
self.compression,
self.dostime,
self.dosdate,
CRC,
compress_size,
file_size,
len(self.file_name),
len(self.extra),
)
return header + self.file_name + self.extra
class ZipStream(object):
"""
Takes an iterable of (filepath, fileobj, [st_mode], [mtime]) tuples and lets you stream the zipped result.
"""
def __init__(self, files_iterable, compression = ZIP_DEFLATED):
if not compression in (ZIP_STORED, ZIP_DEFLATED):
raise RuntimeError("That compression method is not supported.")
if compression == ZIP_DEFLATED and not zlib:
raise RuntimeError("Compression requires the (missing) zlib module.")
self.zip_infos = [] # List of ZipInfo instances for archive
self.compression = compression # Method of compression
self.files = files_iterable
self.data_ptr = 0 # Keep track of location inside archive
def __iter__(self):
for file in self.files:
for data in self.zip_file(*file):
yield data
yield self.archive_footer()
def update_data_ptr(self, data):
"""
As data is added to the archive, update a pointer so we can determine
the location of various structures as they are generated.
data -- data to be added to archive
Returns data
"""
self.data_ptr += len(data)
return data
def zip_file(self, file_name, file_obj, st_mode = DEFAULT_ST_MODE, mtime = DEFAULT_MTIME):
"""
Generates the data to add a file to an archive.
file_name -- the relative file path
file_obj -- the file object to read from
st_mode (optional) -- the st_mode (as returnd by os.stat)
This function generates the data corresponding to the fields:
[local file header n]
[file data n]
[data descriptor n]
as described in section V. of the PKZIP Application Note:
http://www.pkware.com/business_and_developers/developer/appnote/
"""
# Read from the beginning of the file.
file_obj.seek(0)
# Create ZipInfo instance to store file information
external_attr = (st_mode & 0xFFFF) << 16L # Unix attributes
zinfo = ZipInfo(
file_name = file_name,
external_attr = external_attr,
compression = self.compression,
header_offset = self.data_ptr, # Start of header bytes
mtime = mtime,
)
CRC = 0
compress_size = 0
file_size = 0
yield self.update_data_ptr(zinfo.file_header())
if self.compression == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while 1:
buf = file_obj.read(1024 * 8) # Is this necessarily 8 KiB or can it be changed?
if not buf:
break
file_size += len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size += len(buf)
yield self.update_data_ptr(buf)
if cmpr:
buf = cmpr.flush()
compress_size += len(buf)
yield self.update_data_ptr(buf)
else:
compress_size = file_size
zinfo.compress_size = compress_size
zinfo.CRC = CRC
zinfo.file_size = file_size
yield self.update_data_ptr(zinfo.data_descriptor())
self.zip_infos.append(zinfo)
def archive_footer(self):
"""
Returns data to finish off an archive based on the files already
added via zip_file(...). The data returned corresponds to the fields:
[archive decryption header]
[archive extra data record]
[central directory]
[zip64 end of central directory record]
[zip64 end of central directory locator]
[end of central directory record]
as described in section V. of the PKZIP Application Note:
http://www.pkware.com/business_and_developers/developer/appnote/
"""
data = []
count = 0
pos1 = self.data_ptr
for zinfo in self.zip_infos: # write central directory
count += 1
extra = []
if zinfo.file_size > ZIP64_LIMIT or zinfo.compress_size > ZIP64_LIMIT:
extra.append(zinfo.file_size)
extra.append(zinfo.compress_size)
file_size = 0xffffffff # -1
compress_size = 0xffffffff # -1
else:
file_size = zinfo.file_size
compress_size = zinfo.compress_size
if zinfo.header_offset > ZIP64_LIMIT:
extra.append(zinfo.header_offset)
header_offset = -1 # struct "l" format: 32 one bits
else:
header_offset = zinfo.header_offset
extra_data = zinfo.extra
if extra:
# Append a ZIP64 field to the extra's
extra_data = struct.pack('<hh' + 'q' * len(extra), 1, 8 * len(extra), *extra) + extra_data
extract_version = max(45, zinfo.extract_version)
create_version = max(45, zinfo.create_version)
else:
extract_version = zinfo.extract_version
create_version = zinfo.create_version
centdir = struct.pack(
STRUCT_CENTRAL_DIR,
STRING_CENTRAL_DIR,
create_version,
zinfo.create_system,
extract_version,
zinfo.reserved,
zinfo.flag_bits,
zinfo.compression,
zinfo.dostime,
zinfo.dosdate,
zinfo.CRC,
compress_size,
file_size,
len(zinfo.file_name),
len(extra_data),
len(zinfo.comment),
0, # This was undocumented in the original code. What is it?
zinfo.internal_attr,
zinfo.external_attr,
header_offset
)
data.append(self.update_data_ptr(centdir))
data.append(self.update_data_ptr(zinfo.file_name))
data.append(self.update_data_ptr(extra_data))
data.append(self.update_data_ptr(zinfo.comment))
pos2 = self.data_ptr
# Write end-of-zip-archive record
if pos1 > ZIP64_LIMIT:
# Need to write the ZIP64 end-of-archive records
zip64endrec = struct.pack(STRUCT_END_ARCHIVE_64, STRING_END_ARCHIVE_64, 44, 45, 45, 0, 0, count, count, pos2 - pos1, pos1)
data.append(self.update_data_ptr(zip64endrec))
zip64locrec = struct.pack(STRUCT_END_ARCHIVE_64_LOCATOR, STRING_END_ARCHIVE_64_LOCATOR, 0, pos2, 1)
data.append(self.update_data_ptr(zip64locrec))
endrec = struct.pack(STRUCT_END_ARCHIVE, STRING_END_ARCHIVE, 0, 0, count, count, pos2 - pos1, -1, 0)
data.append(self.update_data_ptr(endrec))
else:
endrec = struct.pack(STRUCT_END_ARCHIVE, STRING_END_ARCHIVE, 0, 0, count, count, pos2 - pos1, pos1, 0)
data.append(self.update_data_ptr(endrec))
return ''.join(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment