Skip to content

Instantly share code, notes, and snippets.

@gimite
Created December 21, 2018 06:30
Show Gist options
  • Save gimite/02f08a6c7789e4c1c4841ce0a24b7471 to your computer and use it in GitHub Desktop.
Save gimite/02f08a6c7789e4c1c4841ce0a24b7471 to your computer and use it in GitHub Desktop.
import struct
import StringIO
import binascii
import zlib
import datetime
def deflate(data, compresslevel=9, flush_mode=zlib.Z_FINISH):
compress = zlib.compressobj(
compresslevel, # level: 0-9
zlib.DEFLATED, # method: must be DEFLATED
-zlib.MAX_WBITS, # window size in bits:
# -15..-8: negate, suppress header
# 8..15: normal
# 16..30: subtract 16, gzip header
zlib.DEF_MEM_LEVEL, # mem level: 1..8/9
0 # strategy:
# 0 = Z_DEFAULT_STRATEGY
# 1 = Z_FILTERED
# 2 = Z_HUFFMAN_ONLY
# 3 = Z_RLE
# 4 = Z_FIXED
)
deflated = compress.compress(data)
deflated += compress.flush(flush_mode)
return deflated
def inflate(data):
decompress = zlib.decompressobj(
-zlib.MAX_WBITS # see above
)
inflated = decompress.decompress(data)
inflated += decompress.flush()
return inflated
class ResumableZipFileWriter(object):
def __init__(self, state=None):
self.io = StringIO.StringIO()
if state:
self.base_offset = state['offset']
self.files_metadata = state['files_metadata']
else:
self.base_offset = 0
self.files_metadata = []
self.total_central_header_size = 0
def get_output(self):
return self.io.getvalue()
def get_state(self):
return {
'offset': self.get_current_offset(),
'files_metadata': self.files_metadata,
}
def get_current_offset(self):
return self.base_offset + self.io.tell()
def begin_file(self, name, modified_time=datetime.datetime.now(), mode=0644):
file_metadata = {
'name': name,
'modified_time': modified_time,
'mode': mode,
'original_size': 0,
'compressed_size': 0,
'crc': 0,
'offset': self.get_current_offset(),
}
self.files_metadata.append(file_metadata)
self.write_header(file_metadata, is_central=False)
def append_to_file(self, data):
compressed_data = deflate(data, flush_mode=zlib.Z_SYNC_FLUSH)
current_metadata = self.files_metadata[-1]
current_metadata['original_size'] += len(data)
current_metadata['compressed_size'] += len(compressed_data)
current_metadata['crc'] = binascii.crc32(data, current_metadata['crc']) & 0xffffffff
self.io.write(compressed_data)
def end_file(self):
compressed_data = deflate('')
self.io.write(compressed_data)
current_metadata = self.files_metadata[-1]
current_metadata['compressed_size'] += len(compressed_data)
self.write_data_descriptor()
def flush(self):
self.central_directory_start_offset = self.get_current_offset()
for file_metadata in self.files_metadata:
self.write_header(file_metadata, is_central=True)
self.write_end_of_central_directory_record()
def write_header(self, file_metadata, is_central):
start = self.get_current_offset()
if is_central:
self.pack4(0x02014b50)
self.pack1(20)
self.pack1(3)
else:
self.pack4(0x04034b50)
self.pack2(20)
self.pack2(8)
self.pack2(8) # deflate
time = file_metadata['modified_time']
self.pack2((time.hour << 11) | (time.minute << 5) | (time.second // 2))
self.pack2(((time.year - 1980) << 9) | (time.month << 5) | time.day)
if is_central:
self.pack4(file_metadata['crc'])
self.pack4(file_metadata['compressed_size'])
self.pack4(file_metadata['original_size'])
else:
self.pack4(0)
self.pack4(0)
self.pack4(0)
self.pack2(len(file_metadata['name']))
self.pack2(0)
if is_central:
self.pack2(0)
self.pack2(0)
self.pack2(0)
self.pack4(file_metadata['mode'] << 16)
self.pack4(file_metadata['offset'])
self.io.write(file_metadata['name'])
if is_central:
self.total_central_header_size += self.get_current_offset() - start
def write_data_descriptor(self):
current_metadata = self.files_metadata[-1]
self.pack4(0x08074b50)
self.pack4(current_metadata['crc'])
self.pack4(current_metadata['compressed_size'])
self.pack4(current_metadata['original_size'])
def write_end_of_central_directory_record(self):
self.pack4(0x06054b50)
self.pack2(0)
self.pack2(0)
self.pack2(len(self.files_metadata))
self.pack2(len(self.files_metadata))
self.pack4(self.total_central_header_size)
self.pack4(self.central_directory_start_offset)
self.pack2(0)
def pack1(self, i):
self.io.write(struct.pack('<B', i))
def pack2(self, i):
self.io.write(struct.pack('<H', i))
def pack4(self, i):
self.io.write(struct.pack('<I', i))
with open('hoge.zip', 'wb') as f:
gen = ResumableZipFileWriter()
gen.begin_file('hoge.txt')
gen.append_to_file('hello ')
f.write(gen.get_output())
state = gen.get_state()
gen = ResumableZipFileWriter(state)
gen.append_to_file('world')
gen.end_file()
gen.begin_file('foo.txt')
gen.append_to_file('foo')
gen.end_file()
gen.flush()
f.write(gen.get_output())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment