Created
December 21, 2018 06:30
-
-
Save gimite/02f08a6c7789e4c1c4841ce0a24b7471 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import StringIO | |
import binascii | |
import zlib | |
import datetime | |
def deflate(data, compresslevel=9, flush_mode=zlib.Z_FINISH): | |
compress = zlib.compressobj( | |
compresslevel, # level: 0-9 | |
zlib.DEFLATED, # method: must be DEFLATED | |
-zlib.MAX_WBITS, # window size in bits: | |
# -15..-8: negate, suppress header | |
# 8..15: normal | |
# 16..30: subtract 16, gzip header | |
zlib.DEF_MEM_LEVEL, # mem level: 1..8/9 | |
0 # strategy: | |
# 0 = Z_DEFAULT_STRATEGY | |
# 1 = Z_FILTERED | |
# 2 = Z_HUFFMAN_ONLY | |
# 3 = Z_RLE | |
# 4 = Z_FIXED | |
) | |
deflated = compress.compress(data) | |
deflated += compress.flush(flush_mode) | |
return deflated | |
def inflate(data): | |
decompress = zlib.decompressobj( | |
-zlib.MAX_WBITS # see above | |
) | |
inflated = decompress.decompress(data) | |
inflated += decompress.flush() | |
return inflated | |
class ResumableZipFileWriter(object): | |
def __init__(self, state=None): | |
self.io = StringIO.StringIO() | |
if state: | |
self.base_offset = state['offset'] | |
self.files_metadata = state['files_metadata'] | |
else: | |
self.base_offset = 0 | |
self.files_metadata = [] | |
self.total_central_header_size = 0 | |
def get_output(self): | |
return self.io.getvalue() | |
def get_state(self): | |
return { | |
'offset': self.get_current_offset(), | |
'files_metadata': self.files_metadata, | |
} | |
def get_current_offset(self): | |
return self.base_offset + self.io.tell() | |
def begin_file(self, name, modified_time=datetime.datetime.now(), mode=0644): | |
file_metadata = { | |
'name': name, | |
'modified_time': modified_time, | |
'mode': mode, | |
'original_size': 0, | |
'compressed_size': 0, | |
'crc': 0, | |
'offset': self.get_current_offset(), | |
} | |
self.files_metadata.append(file_metadata) | |
self.write_header(file_metadata, is_central=False) | |
def append_to_file(self, data): | |
compressed_data = deflate(data, flush_mode=zlib.Z_SYNC_FLUSH) | |
current_metadata = self.files_metadata[-1] | |
current_metadata['original_size'] += len(data) | |
current_metadata['compressed_size'] += len(compressed_data) | |
current_metadata['crc'] = binascii.crc32(data, current_metadata['crc']) & 0xffffffff | |
self.io.write(compressed_data) | |
def end_file(self): | |
compressed_data = deflate('') | |
self.io.write(compressed_data) | |
current_metadata = self.files_metadata[-1] | |
current_metadata['compressed_size'] += len(compressed_data) | |
self.write_data_descriptor() | |
def flush(self): | |
self.central_directory_start_offset = self.get_current_offset() | |
for file_metadata in self.files_metadata: | |
self.write_header(file_metadata, is_central=True) | |
self.write_end_of_central_directory_record() | |
def write_header(self, file_metadata, is_central): | |
start = self.get_current_offset() | |
if is_central: | |
self.pack4(0x02014b50) | |
self.pack1(20) | |
self.pack1(3) | |
else: | |
self.pack4(0x04034b50) | |
self.pack2(20) | |
self.pack2(8) | |
self.pack2(8) # deflate | |
time = file_metadata['modified_time'] | |
self.pack2((time.hour << 11) | (time.minute << 5) | (time.second // 2)) | |
self.pack2(((time.year - 1980) << 9) | (time.month << 5) | time.day) | |
if is_central: | |
self.pack4(file_metadata['crc']) | |
self.pack4(file_metadata['compressed_size']) | |
self.pack4(file_metadata['original_size']) | |
else: | |
self.pack4(0) | |
self.pack4(0) | |
self.pack4(0) | |
self.pack2(len(file_metadata['name'])) | |
self.pack2(0) | |
if is_central: | |
self.pack2(0) | |
self.pack2(0) | |
self.pack2(0) | |
self.pack4(file_metadata['mode'] << 16) | |
self.pack4(file_metadata['offset']) | |
self.io.write(file_metadata['name']) | |
if is_central: | |
self.total_central_header_size += self.get_current_offset() - start | |
def write_data_descriptor(self): | |
current_metadata = self.files_metadata[-1] | |
self.pack4(0x08074b50) | |
self.pack4(current_metadata['crc']) | |
self.pack4(current_metadata['compressed_size']) | |
self.pack4(current_metadata['original_size']) | |
def write_end_of_central_directory_record(self): | |
self.pack4(0x06054b50) | |
self.pack2(0) | |
self.pack2(0) | |
self.pack2(len(self.files_metadata)) | |
self.pack2(len(self.files_metadata)) | |
self.pack4(self.total_central_header_size) | |
self.pack4(self.central_directory_start_offset) | |
self.pack2(0) | |
def pack1(self, i): | |
self.io.write(struct.pack('<B', i)) | |
def pack2(self, i): | |
self.io.write(struct.pack('<H', i)) | |
def pack4(self, i): | |
self.io.write(struct.pack('<I', i)) | |
with open('hoge.zip', 'wb') as f: | |
gen = ResumableZipFileWriter() | |
gen.begin_file('hoge.txt') | |
gen.append_to_file('hello ') | |
f.write(gen.get_output()) | |
state = gen.get_state() | |
gen = ResumableZipFileWriter(state) | |
gen.append_to_file('world') | |
gen.end_file() | |
gen.begin_file('foo.txt') | |
gen.append_to_file('foo') | |
gen.end_file() | |
gen.flush() | |
f.write(gen.get_output()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment