Skip to content

Instantly share code, notes, and snippets.

@linw1995 linw1995/LZW.py
Last active Jan 3, 2019

Embed
What would you like to do?
LZW Encoding and Data Compression.
# python version 3.7
# author: linw1995
# website: linw1995.com
import argparse
import io
import math
from typing import Dict, List
from six import int2byte
# 初始字符串编码表
ORIGINAL_CDICT = dict(zip((int2byte(x) for x in range(256)), range(256)))
# 编码对应字符串列表
ORIGINAL_KDICT = [int2byte(x) for x in range(256)]
# 9 ~ 16,最长码存储长度。避免编码表过大,占用不必要的空间
MAX_SIZE = 16
def compress(reader: io.IOBase, writer: io.IOBase):
cdict: Dict[bytes, int] = ORIGINAL_CDICT.copy() # 字符串编码表
prefix = reader.read(1) # 字符串前缀
size = 9 # 编码存储长度
while True:
char = reader.read(1)
if not char:
break
'''
编码:
'''
next_prefix = prefix + char
if next_prefix in cdict:
'''
字符串前缀 + 字符 in 字符串编码表:
字符串前缀延长
'''
prefix = next_prefix
else:
'''
字符串前缀 + 字符 not in 字符串编码表:
1. 字符串前缀 写入 压缩比特流
2. 字符串前缀 + 字符 写入 字符串编码表
3. 字符串前缀 更新为 字符
'''
n_bytes = math.ceil(size / 8)
compressed = cdict[prefix] \
.to_bytes(byteorder='big', length=n_bytes)
writer.write(compressed)
cdict[next_prefix] = len(cdict)
size = math.ceil(math.log2(len(cdict)))
prefix = char
if size == MAX_SIZE:
cdict = ORIGINAL_CDICT.copy()
size = 9
compressed = cdict[prefix].to_bytes(
byteorder='big', length=math.ceil(size / 8))
writer.write(compressed)
def decompress(reader: io.IOBase, writer: io.IOBase):
cdict: Dict[bytes, int] = ORIGINAL_CDICT.copy() # 字符串编码表
kdict: List[bytes] = ORIGINAL_KDICT.copy() # 编码映射字符串
prefix = b'' # 字符串前缀
size = 9 # 编码存储长度 9 bits
while True:
'''
解码:
1. 根据编码存储长度获取编码值
'''
n_bytes = math.ceil(size / 8)
data = reader.read(n_bytes)
if not data:
break
key = int.from_bytes(data, byteorder='big')
if key == len(cdict):
chars = prefix + prefix[0:1]
else:
chars = kdict[key]
for char in chars:
next_prefix = prefix + int2byte(char)
if next_prefix in cdict:
'''
字符串前缀 + 字符 in 字符串编码表:
字符串前缀延长
'''
prefix = next_prefix
else:
'''
字符串前缀 + 字符 not in 字符串编码表:
1. 字符串前缀 + 字符 写入 字符串编码表
2. 字符串前缀 更新为 字符
'''
cdict[next_prefix] = len(cdict)
kdict.append(next_prefix)
prefix = int2byte(char)
size = math.ceil(math.log2(len(cdict))) # bits
if size == MAX_SIZE:
cdict = ORIGINAL_CDICT.copy()
kdict = ORIGINAL_KDICT.copy()
size = 9
# 写入解码结果
writer.write(chars)
def main():
parser = argparse.ArgumentParser(
description='compress file by using Lempel-Ziv-Welch Algorithm\
, aka LZW')
parser.add_argument(dest='filename', metavar='filename')
parser.add_argument('-o', '--outfile', action='store', help='output file')
parser.add_argument(
'-m',
'--mode',
type=int,
default=0,
choices=[0, 1],
help='0: compress 1: decompress')
args = parser.parse_args()
if args.mode == 0:
# 压缩
outfile = args.outfile or args.filename + '.lzw'
with open(args.filename, 'rb') as in_f, open(outfile, 'wb') as out_f:
compress(reader=in_f, writer=out_f)
else:
# 解压缩
outfile = args.outfile or '.'.join(args.filename.split('.')[:-1])
with open(args.filename, 'rb') as in_f, open(outfile, 'wb') as out_f:
decompress(reader=in_f, writer=out_f)
if __name__ == '__main__':
main()
import io
import random
import unittest
from LZW import compress, decompress
class LZWTestCase(unittest.TestCase):
def test(self):
content = bytearray()
for i in range(0, 10):
with self.subTest(i=i):
for _ in range(len(content), 2**(10 + i)):
char = random.randint(49, 57)
content.append(char)
compress_out = io.BytesIO()
compress_in = io.BytesIO(content)
compress(compress_in, compress_out)
compressed = compress_out.getvalue()
print('\nsub test:', i)
print('text len', len(content))
print('compressed len', len(compressed))
print('compress rate %.2f%%' %
(len(compressed) / len(content) * 100))
compress_out.seek(0, 0)
decompress_out = io.BytesIO()
decompress(compress_out, decompress_out)
decompressed = decompress_out.getvalue()
self.assertEqual(decompressed, content)
if __name__ == '__main__':
unittest.main(verbosity=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.