Last active
August 29, 2015 14:19
-
-
Save bign8/4d24537f57a9538bc702 to your computer and use it in GitHub Desktop.
CSCI-532 Huffman Codes Project
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
iliad.txt | |
tables.py | |
flashplayer.dmg |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from binascii import hexlify, unhexlify | |
from collections import defaultdict | |
from Queue import PriorityQueue | |
import re | |
from time import time | |
class CharGen(object): | |
def __init__(self, data, k=1): | |
self.data = data | |
self.k = k | |
def __call__(self): | |
for i in xrange(0, len(self.data), self.k): | |
yield self.data[i:i + self.k] | |
def __repr__(self): | |
return 'k = {}'.format(self.k).rjust(6) | |
class WordGen(object): | |
def __init__(self, data): | |
self.data = data | |
def __call__(self): | |
test = re.finditer('(\s+)', self.data) | |
for word in self.data.split(): | |
yield word, False | |
yield test.next().groups()[0], True | |
def __repr__(self): | |
return 'word'.rjust(6) | |
class Node(object): | |
left = None | |
right = None | |
freq = 0 | |
letter = None | |
def __init__(self, freq=0, letter=None, left=None, right=None): | |
self.letter = letter | |
self.freq = freq if freq > 0 else left.freq + right.freq | |
self.left = left | |
self.right = right | |
def __cmp__(self, other): | |
return cmp(self.freq, other.freq) | |
def __repr__(self): | |
if self.letter: | |
return '{}: {}'.format(self.letter, self.freq) | |
return '({}, {})'.format(self.left, self.right) | |
def huffman(Q, n): | |
for _ in xrange(n - 1): | |
Q.put(Node(left=Q.get(), right=Q.get())) | |
return Q.get() | |
def pre_process(generator): | |
cache = defaultdict(int) | |
for item in generator(): | |
cache[item] += 1 | |
Q = PriorityQueue() | |
for key, value in cache.iteritems(): | |
Q.put(Node(freq=value, letter=key)) | |
return Q, len(cache) | |
def pre_process_word(generator): | |
alpha = defaultdict(int) | |
punct = defaultdict(int) | |
for item, is_punct in generator(): | |
if is_punct: | |
punct[item] += 1 | |
else: | |
alpha[item] += 1 | |
aQ, pQ = PriorityQueue(), PriorityQueue() | |
for key, value in alpha.iteritems(): | |
aQ.put(Node(freq=value, letter=key)) | |
for key, value in punct.iteritems(): | |
pQ.put(Node(freq=value, letter=key)) | |
return aQ, len(alpha), pQ, len(punct) | |
def post_process(tree, key=''): | |
cache = {} | |
if tree.letter: | |
cache[tree.letter] = key | |
else: | |
cache.update(post_process(tree.left, key + '0')) | |
cache.update(post_process(tree.right, key + '1')) | |
return cache | |
def compress(lookup, generator): | |
# http://stackoverflow.com/a/7397689/3220865 | |
result = '0b1' | |
for item in generator(): | |
result += lookup[item] | |
# print result, len(result) | |
num = int(result, 2) | |
nums = '%x' % num | |
clip = False | |
if len(nums) % 2: | |
nums += '0' # TODO: do bit magic to wrap clip boolean into last 2 bits of string | |
clip = True # IE: if last two are odd clip, even nothing ... add one to modify? | |
return unhexlify(nums) + ('-' if clip else '+') | |
def compress_word(a_lookup, p_lookup, generator): | |
# http://stackoverflow.com/a/7397689/3220865 | |
result = '0b1' | |
for item, is_punct in generator(): | |
if is_punct: | |
result += p_lookup[item] | |
else: | |
result += a_lookup[item] | |
# print result, len(result) | |
num = int(result, 2) | |
nums = '%x' % num | |
clip = False | |
if len(nums) % 2: | |
nums += '0' # TODO: do bit magic to wrap clip boolean into last 2 bits of string | |
clip = True # IE: if last two are odd clip, even nothing ... add one to modify? | |
return unhexlify(nums) + ('-' if clip else '+') | |
def extract(source, tree): | |
clip = -4 if source[-1] == '-' else None | |
data, node = '', tree | |
source = bin(int(hexlify(source[:-1]), 16))[3:clip] | |
# print '0b1' + source | |
for i in xrange(len(source)): | |
key = int(source[i]) | |
node = node.right if key else node.left | |
if node.letter: | |
data += node.letter | |
node = tree | |
return data | |
def test(gen): | |
# print repr(gen.data) | |
start = time() | |
Q, n = pre_process(gen) | |
tree = huffman(Q, n) | |
lookup = post_process(tree) | |
# print tree, '\n', lookup | |
garbage = compress(lookup, gen) | |
full, small, length = len(gen.data), len(garbage) - 1, max(len(k) for k in lookup.keys()) | |
real = small + len(lookup) * length | |
print gen, ':', small, real / float(full), small / float(full), real | |
print 'compress', time() - start | |
start = time() | |
source = extract(garbage, tree) | |
print 'extract', time() - start | |
# print repr(source) | |
assert gen.data == source | |
return real | |
def test_word(gen): | |
# print repr(gen.data) | |
start = time() | |
aQ, an, pQ, pn = pre_process_word(gen) | |
a_tree = huffman(aQ, an) | |
p_tree = huffman(pQ, pn) | |
a_lookup = post_process(a_tree) | |
p_lookup = post_process(p_tree) | |
# print tree, '\n', lookup | |
garbage = compress_word(a_lookup, p_lookup, gen) | |
a_len = max(len(k) for k in a_lookup.keys()) | |
p_len = max(len(k) for k in p_lookup.keys()) | |
full, small = len(gen.data), len(garbage) - 1 | |
real = small + len(a_lookup) * a_len + len(p_lookup) * p_len | |
print gen, ':', small, real / float(full), small / float(full), real | |
print 'compress', time() - start | |
return | |
start = time() | |
source = extract(garbage, tree) | |
print 'extract', time() - start | |
# print repr(source) | |
assert gen.data == source | |
return real | |
def compare(file): | |
print '-------', file, '-------' | |
with open(file, 'r') as f: | |
data = f.read() | |
gen1 = CharGen(data, 1) | |
gen2 = WordGen(data) | |
test(gen1) | |
test_word(gen2) | |
if __name__ == '__main__': | |
compare('iliad.txt') | |
compare('tables.py') | |
compare('flashplayer.dmg') | |
# print min(xrange(1, 22), key=lambda x: test(CharGen(data, x))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment