Skip to content

Instantly share code, notes, and snippets.

@bancek
Created July 29, 2016 09:49
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bancek/7fe5fe0476e81495f99633cf91b6ee29 to your computer and use it in GitHub Desktop.
Save bancek/7fe5fe0476e81495f99633cf91b6ee29 to your computer and use it in GitHub Desktop.
Python ZIP streaming
import struct
import zipfile
import time
import os
from binascii import crc32
def commonprefix(m):
"Given a list of pathnames, returns the longest common leading component"
if not m: return ''
s1 = min(m)
s2 = max(m)
for i, c in enumerate(s1):
if c != s2[i]:
return s1[:i]
return s1
def relpath(path, start=os.path.curdir):
"""Return a relative version of a path"""
if not path:
raise ValueError("no path specified")
start_list = [x for x in os.path.abspath(start).split(os.path.sep) if x]
path_list = [x for x in os.path.abspath(path).split(os.path.sep) if x]
i = len(commonprefix([start_list, path_list]))
rel_list = [os.path.pardir] * (len(start_list)-i) + path_list[i:]
if not rel_list:
return os.path.curdir
return os.path.join(*rel_list)
class ZipBuffer(object):
""" A file-like object for zipfile.ZipFile to write into. """
def __init__(self):
self.data = []
self.pos = 0
def write(self, data):
self.data.append(data)
self.pos += len(data)
def tell(self):
# zipfile calls this so we need it
return self.pos
def flush(self):
# zipfile calls this so we need it
pass
def get_and_clear(self):
result = self.data
self.data = []
return result
class XZipFile(zipfile.ZipFile):
def write_streaming(self, zinfo_or_arcname, filename, compress_type=None):
"""Write a file into the archive. The contents is the string
'bytes'. 'zinfo_or_arcname' is either a ZipInfo instance or
the name of the file in the archive."""
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(
filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6])
zinfo.compress_type = self.compression
zinfo.flag_bits = 0x08 # magic
zinfo.external_attr = 0600 << 16
else:
zinfo = zinfo_or_arcname
if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed")
if compress_type is not None:
raise RuntimeError("Compression not supported!")
zinfo.CRC = CRC = 0
zinfo.compress_size = 0
zinfo.file_size = file_size = 0
zinfo.header_offset = self.fp.tell() # Start of header bytes
self._writecheck(zinfo)
self._didModify = True
self.fp.write(zinfo.FileHeader())
with open(filename, 'rb') as fp:
while 1:
buf = fp.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = crc32(buf, CRC) & 0xffffffff
self.fp.write(buf)
yield None
zinfo.CRC = CRC
zinfo.file_size = file_size
zinfo.compress_size = file_size
if zinfo.flag_bits & 0x08:
# Write CRC and file sizes after the file data
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
class FileGenerator(object):
def __init__(self, src_dir, passes_filters):
self.name = os.path.basename(src_dir.rstrip('/')) + 'zip'
self.generator = _stream_folder(src_dir, passes_filters)
def read(self, n):
try:
x = self.generator.next()
while not len(x):
x = self.generator.next()
return x
except StopIteration:
return ''
def stream_folder(src_dir, passes_filters):
return FileGenerator(src_dir, passes_filters)
def _stream_folder(src_dir, passes_filters):
sink = ZipBuffer()
archive = XZipFile(sink, mode='w', compression=zipfile.ZIP_STORED,
allowZip64=True)
for root, _, files in os.walk(unicode(src_dir)):
for f in files:
path = os.path.join(root, f)
if passes_filters(path):
rel_path = relpath(path, src_dir).replace(os.path.sep, '/')
for _ in archive.write_streaming(rel_path, path):
for chunk in sink.get_and_clear():
yield chunk
archive.close()
# close() generates some more data, so we yield that too
for chunk in sink.get_and_clear():
yield chunk
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment