Skip to content

Instantly share code, notes, and snippets.

@thatch
Created July 25, 2022 19:05
Show Gist options
  • Save thatch/c2e8aa3d34a7a548687f43813f9cb788 to your computer and use it in GitHub Desktop.
Save thatch/c2e8aa3d34a7a548687f43813f9cb788 to your computer and use it in GitHub Desktop.
from typing import Any
import sys
import zstandard
def find_optimum_blocksize(cctx: Any, past: bytes, data: bytes) -> int:
optimum_blocksize = None
optimum_total = None
blocksize = 128*1024
window = 1024*1024
while blocksize >= 128:
cobj = cctx.compressobj()
tmp = b''
tmp += cobj.compress(past[-window:])
for i in range(0, len(data), blocksize):
tmp += cobj.compress(data[i:i+blocksize])
tmp += cobj.flush(zstandard.COMPRESSOBJ_FLUSH_BLOCK)
#print("try", blocksize, len(tmp))
if optimum_total is None or len(tmp) < optimum_total:
optimum_blocksize = blocksize
optimum_total = len(tmp)
elif optimum_total is not None and len(tmp) > optimum_total:
break
blocksize //=2
return optimum_blocksize
def main(level, input_file):
with open(input_file, "rb") as f:
optimum_blocksizes = []
params = zstandard.ZstdCompressionParameters.from_level(level)
cctx = zstandard.ZstdCompressor(compression_params=params)
past = b''
while True:
data = f.read(128*1024)
if not data:
break
optimum_blocksizes.append(find_optimum_blocksize(cctx, past, data))
past += data
print(optimum_blocksizes)
f.seek(0, 0)
with open(input_file + ".4k.zst", "wb") as fo:
cobj = cctx.compressobj()
# We read the file twice because there appears to be a leak between
# compressobj on the same context?
while True:
data = f.read(128*1024)
if not data:
break
optimum_blocksize = optimum_blocksizes.pop(0)
for i in range(0, len(data), optimum_blocksize):
fo.write(cobj.compress(data[i:i+optimum_blocksize]))
fo.write(cobj.flush(zstandard.COMPRESSOBJ_FLUSH_BLOCK))
fo.write(cobj.flush())
print("Size is", fo.tell())
if __name__ == "__main__":
main(-int(sys.argv[1]), sys.argv[2])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment