Last active
January 3, 2019 16:37
-
-
Save linw1995/38b805e35d667d45e51612c11c71a70b to your computer and use it in GitHub Desktop.
LZW Encoding and Data Compression.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python version 3.7 | |
# author: linw1995 | |
# website: linw1995.com | |
import argparse | |
import io | |
import math | |
from typing import Dict, List | |
from six import int2byte | |
# 初始字符串编码表 | |
ORIGINAL_CDICT = dict(zip((int2byte(x) for x in range(256)), range(256))) | |
# 编码对应字符串列表 | |
ORIGINAL_KDICT = [int2byte(x) for x in range(256)] | |
# 9 ~ 16,最长码存储长度。避免编码表过大,占用不必要的空间 | |
MAX_SIZE = 16 | |
def compress(reader: io.IOBase, writer: io.IOBase): | |
cdict: Dict[bytes, int] = ORIGINAL_CDICT.copy() # 字符串编码表 | |
prefix = reader.read(1) # 字符串前缀 | |
size = 9 # 编码存储长度 | |
while True: | |
char = reader.read(1) | |
if not char: | |
break | |
''' | |
编码: | |
''' | |
next_prefix = prefix + char | |
if next_prefix in cdict: | |
''' | |
字符串前缀 + 字符 in 字符串编码表: | |
字符串前缀延长 | |
''' | |
prefix = next_prefix | |
else: | |
''' | |
字符串前缀 + 字符 not in 字符串编码表: | |
1. 字符串前缀 写入 压缩比特流 | |
2. 字符串前缀 + 字符 写入 字符串编码表 | |
3. 字符串前缀 更新为 字符 | |
''' | |
n_bytes = math.ceil(size / 8) | |
compressed = cdict[prefix] \ | |
.to_bytes(byteorder='big', length=n_bytes) | |
writer.write(compressed) | |
cdict[next_prefix] = len(cdict) | |
size = math.ceil(math.log2(len(cdict))) | |
prefix = char | |
if size == MAX_SIZE: | |
cdict = ORIGINAL_CDICT.copy() | |
size = 9 | |
compressed = cdict[prefix].to_bytes( | |
byteorder='big', length=math.ceil(size / 8)) | |
writer.write(compressed) | |
def decompress(reader: io.IOBase, writer: io.IOBase): | |
cdict: Dict[bytes, int] = ORIGINAL_CDICT.copy() # 字符串编码表 | |
kdict: List[bytes] = ORIGINAL_KDICT.copy() # 编码映射字符串 | |
prefix = b'' # 字符串前缀 | |
size = 9 # 编码存储长度 9 bits | |
while True: | |
''' | |
解码: | |
1. 根据编码存储长度获取编码值 | |
''' | |
n_bytes = math.ceil(size / 8) | |
data = reader.read(n_bytes) | |
if not data: | |
break | |
key = int.from_bytes(data, byteorder='big') | |
if key == len(cdict): | |
chars = prefix + prefix[0:1] | |
else: | |
chars = kdict[key] | |
for char in chars: | |
next_prefix = prefix + int2byte(char) | |
if next_prefix in cdict: | |
''' | |
字符串前缀 + 字符 in 字符串编码表: | |
字符串前缀延长 | |
''' | |
prefix = next_prefix | |
else: | |
''' | |
字符串前缀 + 字符 not in 字符串编码表: | |
1. 字符串前缀 + 字符 写入 字符串编码表 | |
2. 字符串前缀 更新为 字符 | |
''' | |
cdict[next_prefix] = len(cdict) | |
kdict.append(next_prefix) | |
prefix = int2byte(char) | |
size = math.ceil(math.log2(len(cdict))) # bits | |
if size == MAX_SIZE: | |
cdict = ORIGINAL_CDICT.copy() | |
kdict = ORIGINAL_KDICT.copy() | |
size = 9 | |
# 写入解码结果 | |
writer.write(chars) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='compress file by using Lempel-Ziv-Welch Algorithm\ | |
, aka LZW') | |
parser.add_argument(dest='filename', metavar='filename') | |
parser.add_argument('-o', '--outfile', action='store', help='output file') | |
parser.add_argument( | |
'-m', | |
'--mode', | |
type=int, | |
default=0, | |
choices=[0, 1], | |
help='0: compress 1: decompress') | |
args = parser.parse_args() | |
if args.mode == 0: | |
# 压缩 | |
outfile = args.outfile or args.filename + '.lzw' | |
with open(args.filename, 'rb') as in_f, open(outfile, 'wb') as out_f: | |
compress(reader=in_f, writer=out_f) | |
else: | |
# 解压缩 | |
outfile = args.outfile or '.'.join(args.filename.split('.')[:-1]) | |
with open(args.filename, 'rb') as in_f, open(outfile, 'wb') as out_f: | |
decompress(reader=in_f, writer=out_f) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import random | |
import unittest | |
from LZW import compress, decompress | |
class LZWTestCase(unittest.TestCase): | |
def test(self): | |
content = bytearray() | |
for i in range(0, 10): | |
with self.subTest(i=i): | |
for _ in range(len(content), 2**(10 + i)): | |
char = random.randint(49, 57) | |
content.append(char) | |
compress_out = io.BytesIO() | |
compress_in = io.BytesIO(content) | |
compress(compress_in, compress_out) | |
compressed = compress_out.getvalue() | |
print('\nsub test:', i) | |
print('text len', len(content)) | |
print('compressed len', len(compressed)) | |
print('compress rate %.2f%%' % | |
(len(compressed) / len(content) * 100)) | |
compress_out.seek(0, 0) | |
decompress_out = io.BytesIO() | |
decompress(compress_out, decompress_out) | |
decompressed = decompress_out.getvalue() | |
self.assertEqual(decompressed, content) | |
if __name__ == '__main__': | |
unittest.main(verbosity=2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment