Skip to content

Instantly share code, notes, and snippets.

@egor83
Created December 2, 2022 05:15
Show Gist options
  • Save egor83/b477dc891ef01a0df36ac63f96040f67 to your computer and use it in GitHub Desktop.
Save egor83/b477dc891ef01a0df36ac63f96040f67 to your computer and use it in GitHub Desktop.
Attempting streaming gzip compression.
import zlib
def yield_uncompressed_bytes():
# In a real case, would yield bytes pulled from the filesystem or the network
chunk = b'*' * 10
for _ in range(0, 5):
print('In: ', len(chunk))
yield chunk
def yield_compressed_bytes(_uncompressed_bytes):
compress_obj = zlib.compressobj()
for chunk in _uncompressed_bytes:
if compressed_bytes := compress_obj.compress(chunk):
yield compressed_bytes
if compressed_bytes := compress_obj.flush():
yield compressed_bytes
uncompressed_bytes = yield_uncompressed_bytes()
compressed_bytes = yield_compressed_bytes(uncompressed_bytes)
with open("test2.gz", "wb") as f_out:
for chunk in compressed_bytes:
# In a real case, could save to the filesystem, or send over the network
# print('Out:', len(chunk))
f_out.write(chunk)
# doesn't recognize the result as an archive
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment