Skip to content

Instantly share code, notes, and snippets.

@linw1995
Created September 24, 2017 13:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linw1995/7e49f6d05276538d8675874c585ece5d to your computer and use it in GitHub Desktop.
Save linw1995/7e49f6d05276538d8675874c585ece5d to your computer and use it in GitHub Desktop.
Huffman Encoding and Data Compression.
from collections import Counter
import argparse
from heapq import heapify, heappop, heappush
from itertools import count
from six import int2byte, byte2int
def huffman_tree(seq: list, frq: list) -> list:
''' 哈夫曼树 '''
num = count()
trees = list(zip(frq, num, seq))
heapify(trees)
while len(trees) > 1:
fa, _, a = heappop(trees)
fb, _, b = heappop(trees)
n = next(num)
heappush(trees, (fa + fb, n, [a, b]))
return trees[0][-1]
def huffman_codes(tree: list) -> dict:
''' 哈夫曼编码 '''
def codes(tree, prefix=''):
if isinstance(tree, int):
yield (tree, prefix)
return
for bit, child in zip('01', tree):
yield from codes(child, prefix + bit)
return dict(codes(tree))
def chunks(l: list, n: int):
''' 按长度分割列表 '''
for i in range(0, len(l), n):
yield l[i:i + n]
def encode_to_compressed_bytes(hcodes: str) -> bytes:
binary = [s for s in chunks(hcodes, 8)]
char_num = []
for s in binary[:-1]:
char_num.append(int(s, 2))
s = binary[-1]
char_num.append(len(s))
char_num.append(int(s, 2))
return b''.join([int2byte(c) for c in char_num])
def decode_to_huffman_codes(compressed_bytes: bytes) -> str:
codes = []
for bt in compressed_bytes[:-2]:
code = bin(bt)[2:]
code = '0' * (8 - len(code) % 8) + code if len(code) % 8 != 0 else code
codes.append(code)
length = compressed_bytes[-2]
code = bin(compressed_bytes[-1])[2:]
code = '0' * (length - len(code)) + code
codes.append(code)
return ''.join(codes)
def encode_header(char_counter: Counter) -> bytes:
rv = b''
length = len(char_counter)
count = len(bin(max(char_counter.values()))[2:]) // 8 + 1
rv += int2byte(length - 1)
rv += int2byte(count)
for k, v in char_counter.items():
rv += int2byte(k)
for _ in range(count):
rv += int2byte(v % 256)
v = v >> 8
return rv
def decode_header(fh) -> Counter:
length = byte2int(fh.read(1)) + 1
count = byte2int(fh.read(1))
char_counter = Counter()
for _ in range(length):
k = byte2int(fh.read(1))
v = 0
for i in range(count):
v = v + (byte2int(fh.read(1)) << 8 * i)
char_counter[k] = v
return char_counter
def compress(data: bytes) -> (bytes, Counter):
char_counter = Counter(data)
seq, frq = [], []
[(seq.append(k), frq.append(v)) for k, v in char_counter.items()]
hf_tree = huffman_tree(seq, frq)
hf_codes = huffman_codes(hf_tree)
codes = ''
for char in data:
codes += hf_codes.get(char)
return encode_to_compressed_bytes(codes), char_counter
def decompress(fh) -> bytes:
char_counter = decode_header(fh)
seq, frq = [], []
[(seq.append(k), frq.append(v)) for k, v in char_counter.items()]
hf_tree = huffman_tree(seq, frq)
data = fh.read()
codes = decode_to_huffman_codes(data)
rv = b''
tree = hf_tree
for bit in codes:
index = int(bit)
tree = tree[index]
if isinstance(tree, int):
rv += int2byte(tree)
tree = hf_tree
return rv
def main():
parser = argparse.ArgumentParser(
description='compress file by using Huffman Coding')
parser.add_argument(dest='filename', metavar='filename')
parser.add_argument('-o', '--outfile', action='store', help='output file')
parser.add_argument(
'-m',
'--mode',
type=int,
default=0,
choices=[0, 1],
help='0: compress 1: decompress')
args = parser.parse_args()
if args.mode == 0:
# 压缩
outfile = args.outfile or args.filename + '.hfm'
with open(args.filename, 'rb') as f:
data = f.read()
compressed_bytes, char_counter = compress(data)
with open(outfile, 'wb') as f:
f.write(encode_header(char_counter))
f.write(compressed_bytes)
else:
# 解压缩
outfile = args.outfile or '.'.join(args.filename.split('.')[:-1])
with open(args.filename, 'rb') as f:
data = decompress(f)
with open(outfile, 'wb') as f:
f.write(data)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment