Skip to content

Instantly share code, notes, and snippets.

@gold24park
Last active November 12, 2022 07:34
Show Gist options
  • Save gold24park/88d7b5b129ad87c7cab3b1fde5834137 to your computer and use it in GitHub Desktop.
Save gold24park/88d7b5b129ad87c7cab3b1fde5834137 to your computer and use it in GitHub Desktop.
Compress & decompress list of dictionaries with python zstandard
import json
import zstandard as zstd
class IterStreamer(object):
"""
File-like streaming iterator.
"""
def __init__(self, generator, divider: str = "\r\n"):
self.generator = generator
self.iterator = iter(generator)
self.leftover = b''
self.divider = divider
def __len__(self):
return self.generator.__len__()
def __iter__(self):
return self.iterator
def next(self):
return (json.dumps(self.iterator.__next__(), ensure_ascii=False) + self.divider).encode('utf-8')
def read(self, size):
data = self.leftover
count = len(self.leftover)
if count < size:
try:
while count < size:
chunk = self.next()
data += chunk
count += len(chunk)
except StopIteration:
pass
self.leftover = data[size:]
return data[:size]
class ZstdHelper:
DIVIDER = "\r\n"
def compress(self, arr: list, filename: str):
istream = IterStreamer(arr, divider=self.DIVIDER)
cctx = zstd.ZstdCompressor()
with open(filename, "wb") as ostream:
cctx.copy_stream(istream, ostream)
def decompress(self, filename: str) -> list:
result = list()
dctx = zstd.ZstdDecompressor()
with open(filename, "rb") as f:
buffer = ''
with dctx.stream_reader(f) as reader:
for chunk in iter(lambda: reader.read(512), ''):
if len(chunk) == 0:
break
buffer += chunk.decode("utf-8")
while True:
lbpos = buffer.find(self.DIVIDER)
if lbpos == -1:
break
for objstr in buffer[:lbpos].split(self.DIVIDER):
result.append(json.loads(obj))
buffer = buffer[(lbpos + len(self.DIVIDER)):]
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment