Created
November 18, 2014 10:19
-
-
Save socksy/d652e24286ad43471461 to your computer and use it in GitHub Desktop.
An example of how to implement huffman in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#author 100004004 | |
#Huffman encoding | |
import random | |
import math | |
#Constant for the end of transmission symbol | |
EOT = chr(4) | |
def get_source_weights(): | |
"""This function prompts the user for a the source alphabet and | |
the probabilities of each source letter. These are normalised. | |
""" | |
weightings = {} | |
while True: | |
c = raw_input("Source Alphabet Char: ") | |
if c == "": | |
break | |
n = None | |
while not n: | |
try: | |
n = float(raw_input("Character weighting: ")) | |
except ValueError: | |
print "Error! Character weighting must be a number" | |
weightings[c] = n | |
return normalise_weights(weightings) | |
def normalise_weights(weight_dict): | |
"""Makes sure the weights all add up to 1""" | |
n = sum(weight_dict.values()) | |
for c in weight_dict: | |
weight_dict[c] = weight_dict[c]/n | |
return weight_dict | |
def extend(k, weight_dict, extended_dict = {}): | |
"""Function to k extend a dictionary | |
Extends single symbols to multiple symbols in the weight dictionary, | |
each with the probability of the inclusive symbols multiplied. | |
For situations in which the message does not divide evenly by k, | |
an end of transmission symbol is put at the end of each combination. | |
""" | |
#first time around, extend by itself if need be | |
if extended_dict == {}: | |
extended_dict = weight_dict | |
if k <= 1: | |
#bottom out if not asked to extend | |
return normalise_weights(extended_dict) | |
else: | |
#need to extend, so permutate all options of adding one letter to | |
#the current weight dictionary | |
next_extended_dict = {} | |
for x in extended_dict: | |
if not x.endswith(EOT): | |
for y in weight_dict: | |
next_extended_dict[x+y] = weight_dict[y] * extended_dict[x] | |
#use special end of transmission char to allow messages not mod k | |
next_extended_dict[x+EOT] \ | |
= extended_dict[x] * (1.0/len(extended_dict)) | |
#fairly arbitrary choice of 1 over weight dict, as if EOT were even | |
#probability with everything else | |
return extend(k-1, weight_dict, next_extended_dict) | |
def random_char (weight_dict): | |
"""With a dict of chars to weights, return weighted random char""" | |
rand = random.random() | |
#keep subtracting source letter probabilities from random number | |
#this has the effect of randomly choosing one of the sources, | |
#with probability of ending up with a letter proportional to its weight | |
for i in weight_dict: | |
rand -= float(weight_dict[i]) | |
if rand <= 0: | |
return i | |
def make_source_message(weight_dict, length): | |
"""Makes a random source message using the weightings given""" | |
message = [] | |
for x in xrange(length): | |
message.append(random_char(weight_dict)) | |
return ''.join(message) | |
class HuffTree(object): | |
"""Simple Binary Tree Object""" | |
def __init__(self, weight, symbol=None, zero=None, one=None): | |
self.weight = weight | |
#symbol only ever populated on leafs | |
self.symbol = symbol | |
self.zero = zero | |
self.one = one | |
def _combine(tree1, tree2): | |
"""Combines two HuffTree nodes to make a new node. | |
Adds the weights together, and makes the previous node's children. | |
Does not add a symbol, so symbols will only be found on leaves. | |
""" | |
return HuffTree(weight = tree1.weight+tree2.weight, zero=tree1, one=tree2) | |
def _weight_dict_to_tree_nodes(weight_dict): | |
"""Converts dictionary to a list of HuffTree nodes with same information""" | |
nodes = [] | |
for w in weight_dict: | |
nodes.append(HuffTree(weight_dict[w],w)) | |
return nodes | |
def make_huffman_tree(weight_dict): | |
"""Constructs a huffman tree using the huffman algorithm""" | |
nodes = _weight_dict_to_tree_nodes(weight_dict) | |
return _build(nodes) | |
def _build(nodes): | |
if len(nodes) == 1: | |
#bottom out if only one node left | |
return nodes[0] | |
else: | |
#otherwise sort the list, take the bottom two elements and combine | |
nodes.sort(key = lambda x: x.weight, reverse=True) | |
n1 = nodes.pop() | |
n2 = nodes.pop() | |
nodes.append(_combine(n2, n1)) | |
return _build(nodes) | |
def flatten_to_dict(tree, codeword="", code_dict={}): | |
"""Traverses the tree, constructing a dict with the code words""" | |
#bottom out - if symbol exists then the node is a leaf | |
if tree.symbol: | |
if codeword == "": | |
#if there is no codeword passed to it yet, then source is 1 symbol | |
codeword = "0" | |
#by the time it gets to a leaf, codeword is constructed & can be added | |
code_dict[tree.symbol] = codeword | |
else: | |
flatten_to_dict(tree.zero, codeword+"0", code_dict) | |
flatten_to_dict(tree.one, codeword+"1", code_dict) | |
return code_dict | |
def huffman_encode(message, coding, k=1): | |
"""Simply encodes a message to its huffman encoding provided.""" | |
code_message = [] | |
for i in chunker(message, k): | |
#fill end of message with EOT symbols | |
while not len(i) == k: | |
i += EOT | |
code_message.append(coding[i]) | |
return "".join(code_message) | |
def huffman_decode(code_message, hufftree): | |
"""Takes an encoded message and traverses the code tree to decode it""" | |
decoded=[] | |
t = hufftree | |
for i in code_message: | |
if i == "0": | |
t = t.zero | |
elif i == "1": | |
t = t.one | |
else: | |
raise Exception("Code_message not binary") | |
if t.symbol: | |
decoded.append(t.symbol) | |
t = hufftree | |
return "".join(decoded) | |
def calculate_average_length(coding, weights, k=1): | |
"""Calculates the average length of a coding | |
coding -- a dictionary of source alphabet to encoded values | |
weights -- a dictionary of source alphabet to normalised weights | |
""" | |
#sum of { Prob(source letter i) * Length(encoded letter i) } | |
return (sum([len(coding[i])*weights[i] for i in coding]))/k | |
def calculate_entropy(weights): | |
"""Calculates the entropy of a coding | |
weights -- a dictionary of source alphabet to normalised weights | |
""" | |
#negative sum of { Prob(source letter i) * log_2(Prob(source letter i)) } | |
return -sum([weights[i]*math.log(weights[i],2) for i in weights]) | |
def _stdin_absint(string): | |
"""Helper function to take an absolute integer value from the user""" | |
x = None | |
while not x: | |
try: | |
x = int(raw_input(string)) | |
x = abs(x) | |
except ValueError: | |
print "Please enter an integer" | |
return x | |
#Helper method taken from http://stackoverflow.com/a/434328 | |
def chunker(seq, size): | |
"""Iterates in chunks""" | |
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) | |
def main(): | |
#Start off by getting initial weights from the user | |
weights = None | |
print "Enter in the source alphabet and weightings, stop by entering a "\ | |
"blank source letter." | |
while not weights: | |
weights = get_source_weights() | |
tree = make_huffman_tree(weights) | |
coding = flatten_to_dict(tree) | |
print "Source alphabet:\n" + str(weights) | |
entropy = calculate_entropy(weights) | |
print "Entropy of the source: " + str(entropy) | |
print "Huffman Encoding: " + str(coding) | |
avlength = calculate_average_length(coding, weights) | |
print "Average length of codeword: " + str(avlength) | |
print "Difference betweeen entropy and avlength: " + str(avlength-entropy) | |
message_length = _stdin_absint("Enter a length for a message: ") | |
message = make_source_message(weights, message_length) | |
print "Source message:\n" + message | |
encoded = huffman_encode(message, coding) | |
print "Encoded message:\n" + encoded | |
print "Encoded length: "+ str(len(encoded)) | |
#NB the message may not be an accurate representation of probabilities | |
#so the encoding may be more compressed than entropy would allow | |
print "Ratio difference from theoretical entropy optimum: "\ | |
+ str(float(len(encoded)) / float(message_length*entropy)) | |
#(somewhat) useless but fun info! | |
print "Compression over ascii: "\ | |
+ str(float(len(encoded)) / float(message_length*7)) | |
k = _stdin_absint("Enter a value to extend the coding by: ") | |
kweights = extend(k, weights) | |
ktree = make_huffman_tree(kweights) | |
#need to reset the internal code_dict -- a little messy | |
kcoding = {} | |
kcoding = flatten_to_dict(ktree, code_dict=kcoding) | |
kavlength = calculate_average_length(kcoding, kweights, k) | |
print "New average length of codeword with k-extension of alphabet: "\ | |
+ str(kavlength) | |
print "Difference between entropy and avlength: " + str(kavlength-entropy) | |
kencoded = huffman_encode(message,kcoding,k) | |
print "New encoding from k-extension:\n" + kencoded | |
print "New encoding length: "+ str(len(kencoded)) | |
print "Difference from without the k-extension: "\ | |
+str(len(encoded)-len(kencoded)) | |
print "No. of redundant characters: "+str(len(message) % k) | |
print "Compression ratio from no extension: "\ | |
+str(float(len(kencoded))/float(len(encoded))) | |
kdecoded = huffman_decode(kencoded,ktree) | |
print "Decoded k-extended string:\n" + kdecoded.rstrip(EOT) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment