Skip to content

Instantly share code, notes, and snippets.

@dholth
Last active December 13, 2022 14:44
Show Gist options
  • Save dholth/0a8b26ddd361ae9a2440f19ceceaaa2e to your computer and use it in GitHub Desktop.
Save dholth/0a8b26ddd361ae9a2440f19ceceaaa2e to your computer and use it in GitHub Desktop.
zstandard compression versus expected size, streaming API
% scalene zstdtest.py
One-shot 2850333
4.179011374944821s
One-stream 2850330
4.084283375064842s
Right-size 2850333
3.7605239170370623s
Chunked stream 2850330
3.8705802909098566s
One-shot 2850333
3.7553894580341876s
"""
Test zstd with different options and data sizes.
"""
import pathlib
import zstandard
import time
import io
import contextlib
@contextlib.contextmanager
def timeme():
start = time.monotonic()
yield
end = time.monotonic()
print(f"{end-start}s")
# The Collected works of Charles Dickens from the Silesia corpus
uncompressed = pathlib.Path("dickens").read_bytes()
ZSTD_COMPRESS_LEVEL = 22
def oneshot():
compressor = zstandard.ZstdCompressor(level=ZSTD_COMPRESS_LEVEL)
with timeme():
result = compressor.compress(uncompressed)
print("One-shot", len(result))
return result
def onestream():
compressor = zstandard.ZstdCompressor(level=ZSTD_COMPRESS_LEVEL)
with timeme():
bio = io.BytesIO()
with compressor.stream_writer(bio, closefd=False) as writer:
writer.write(uncompressed)
writer.close()
print("One-stream", len(bio.getvalue()))
return bio.getvalue()
def rightsize():
compressor = zstandard.ZstdCompressor(level=ZSTD_COMPRESS_LEVEL)
with timeme():
bio = io.BytesIO()
with compressor.stream_writer(
bio, closefd=False, size=len(uncompressed)
) as writer:
writer.write(uncompressed)
writer.close()
print("Right-size", len(bio.getvalue()))
return bio.getvalue()
def multistream():
compressor = zstandard.ZstdCompressor(level=ZSTD_COMPRESS_LEVEL)
with timeme():
bio = io.BytesIO()
with compressor.stream_writer(bio, closefd=False) as writer:
CHUNK = len(uncompressed) // 10
for i in range(0, len(uncompressed), CHUNK):
writer.write(uncompressed[i : i + CHUNK])
writer.close()
print("Chunked stream", len(bio.getvalue()))
return bio.getvalue()
def wrongsize():
# This one's easy - you get an exception
compressor = zstandard.ZstdCompressor(level=ZSTD_COMPRESS_LEVEL)
with timeme():
bio = io.BytesIO()
with compressor.stream_writer(
bio, size=len(uncompressed) + 100, closefd=False
) as writer:
writer.write(uncompressed)
writer.close()
print("Wrong-size", len(bio.getvalue()))
has_size = oneshot()
no_size = onestream()
rightsize()
multistream()
oneshot()
def d1():
decompress = zstandard.ZstdDecompressor()
assert uncompressed == decompress.decompress(has_size)
d1()
def d2():
# the decompress.decompress() API errors with zstd.ZstdError: could not
# determine content size in frame header
decompress = zstandard.ZstdDecompressor().stream_reader(no_size)
assert uncompressed == decompress.read()
d2()
def d3():
# streaming decompression with sized input
decompress = zstandard.ZstdDecompressor().stream_reader(has_size)
assert uncompressed == decompress.read()
d3()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment