Skip to content

Instantly share code, notes, and snippets.

@egor83
Created December 2, 2022 05:03
Show Gist options
  • Save egor83/4c818d002f5f12c8740d83271eca51a4 to your computer and use it in GitHub Desktop.
Save egor83/4c818d002f5f12c8740d83271eca51a4 to your computer and use it in GitHub Desktop.
Trying to write to a compressed zip file
from io import RawIOBase
from zipfile import ZipFile, ZipInfo, ZIP_DEFLATED
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self, b):
if self.closed:
raise ValueError('Stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk
def zipfile_generator(path, stream):
# tried setting compresslevel as well, did not work
with ZipFile(stream, mode='w', compression=ZIP_DEFLATED) as zf:
z_info = ZipInfo.from_file(path)
with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
for chunk in iter(lambda: entry.read(16384), b''):
dest.write(chunk)
# Yield chunk of the zip file stream in bytes.
yield stream.get()
# ZipFile was closed.
yield stream.get()
stream = UnseekableStream()
path = "small.csv"
with open("test.zip", "wb") as f:
for i in zipfile_generator(path, stream):
f.write(i)
stream.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment