|
#!/usr/bin/python3 |
|
|
|
# Copied-and-pasted parts of zstdstream.py from https://github.com/internetarchive/CDX-Writer (AGPL) |
|
|
|
import sys |
|
import zstandard as zstd |
|
|
|
from _zstd_cffi import ffi, lib |
|
|
|
f = open(sys.argv[1], "rb") |
|
|
|
class FrameParameters(object): |
|
def __init__(self, fparams): |
|
self.content_size = fparams.frameContentSize |
|
self.window_size = fparams.windowSize |
|
self.dict_id = fparams.dictID |
|
self.has_checksum = bool(fparams.checksumFlag) |
|
|
|
class FrameParametersEx(FrameParameters): |
|
def __init__(self, fparams): |
|
FrameParameters.__init__(self, fparams) |
|
self.frame_type = fparams.frameType |
|
# headerSize is always 0 for skippable frames. no use to copy. |
|
#self.header_size = fparams.headerSize |
|
|
|
def _get_frame_parameters(data): |
|
params = ffi.new('ZSTD_frameHeader *') |
|
|
|
data_buffer = ffi.from_buffer(data) |
|
zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) |
|
if lib.ZSTD_isError(zresult): |
|
raise zstd.ZstdError('cannot get frame parameters: %s' % |
|
_zstd_error(zresult)) |
|
|
|
if zresult: |
|
raise zstd.ZstdError('not enough data for frame parameters; need %d bytes' % |
|
zresult) |
|
|
|
return FrameParametersEx(params[0]) |
|
|
|
|
|
def get_zstd_dictionary(fobj): |
|
# method 1: the first skippable frame |
|
# frame header is 2 to 14 bytes. |
|
if hasattr(fobj, 'peek'): |
|
data = fobj.peek(4 + 14) |
|
else: |
|
data = fobj.read(4 + 14) |
|
fobj.seek(-len(data), 1) |
|
try: |
|
frame_params = _get_frame_parameters(data) |
|
# dictionary frame must meet following conditions: |
|
# * it is a skippable frame (frame_type == 1) |
|
# * it has frame_content_size > 0 |
|
# * it does not have dict |
|
# dictionary frame must not have dictionary |
|
if frame_params.frame_type == 1 and frame_params.dict_id == 0: |
|
content_size = frame_params.content_size |
|
if content_size != lib.ZSTD_CONTENTSIZE_UNKNOWN: |
|
# getFrameHeader() does not set headerSize. Assume fixed length 8 |
|
fobj.seek(8, 1) |
|
zdict = fobj.read(content_size) |
|
assert len(zdict) == content_size |
|
if frame_params.has_checksum: |
|
fobj.seek(4, 1) |
|
magic = zdict[:4] |
|
if zdict[:4] == b'\x28\xb5\x2f\xfd': |
|
# zstd compressed |
|
dctx = zstd.ZstdDecompressor() |
|
zdict = dctx.decompress(zdict) |
|
magic = zdict[:4] |
|
if magic == b'\x37\xa4\x30\xec': |
|
return zdict |
|
return b'' |
|
except zstd.ZstdError: |
|
return b'' |
|
|
|
|
|
|
|
|
|
|
|
zdict = get_zstd_dictionary(f) |
|
|
|
sys.stdout.buffer.write(zdict) |