Created November 18, 2014 10:19
An example of how to implement huffman in python
#!/usr/bin/env python
#author 100004004
#Huffman encoding
import random
import math
#Constant for the end of transmission symbol
EOT = chr(4)
def get_source_weights():
"""This function prompts the user for a the source alphabet and
the probabilities of each source letter. These are normalised.
weightings = {}
while True:
c = raw_input("Source Alphabet Char: ")
if c == "":
n = None
while not n:
n = float(raw_input("Character weighting: "))
except ValueError:
print "Error! Character weighting must be a number"
weightings[c] = n
return normalise_weights(weightings)
def normalise_weights(weight_dict):
"""Makes sure the weights all add up to 1"""
n = sum(weight_dict.values())
for c in weight_dict:
weight_dict[c] = weight_dict[c]/n
return weight_dict
def extend(k, weight_dict, extended_dict = {}):
"""Function to k extend a dictionary
Extends single symbols to multiple symbols in the weight dictionary,
each with the probability of the inclusive symbols multiplied.
For situations in which the message does not divide evenly by k,
an end of transmission symbol is put at the end of each combination.
#first time around, extend by itself if need be
if extended_dict == {}:
extended_dict = weight_dict
if k <= 1:
#bottom out if not asked to extend
return normalise_weights(extended_dict)
#need to extend, so permutate all options of adding one letter to
#the current weight dictionary
next_extended_dict = {}
for x in extended_dict:
if not x.endswith(EOT):
for y in weight_dict:
next_extended_dict[x+y] = weight_dict[y] * extended_dict[x]
#use special end of transmission char to allow messages not mod k
next_extended_dict[x+EOT] \
= extended_dict[x] * (1.0/len(extended_dict))
#fairly arbitrary choice of 1 over weight dict, as if EOT were even
#probability with everything else
return extend(k-1, weight_dict, next_extended_dict)
def random_char (weight_dict):
"""With a dict of chars to weights, return weighted random char"""
rand = random.random()
#keep subtracting source letter probabilities from random number
#this has the effect of randomly choosing one of the sources,
#with probability of ending up with a letter proportional to its weight
for i in weight_dict:
rand -= float(weight_dict[i])
if rand <= 0:
return i
def make_source_message(weight_dict, length):
"""Makes a random source message using the weightings given"""
message = []
for x in xrange(length):
return ''.join(message)
class HuffTree(object):
"""Simple Binary Tree Object"""
def __init__(self, weight, symbol=None, zero=None, one=None):
self.weight = weight
#symbol only ever populated on leafs
self.symbol = symbol = zero = one
def _combine(tree1, tree2):
"""Combines two HuffTree nodes to make a new node.
Adds the weights together, and makes the previous node's children.
Does not add a symbol, so symbols will only be found on leaves.
return HuffTree(weight = tree1.weight+tree2.weight, zero=tree1, one=tree2)
def _weight_dict_to_tree_nodes(weight_dict):
"""Converts dictionary to a list of HuffTree nodes with same information"""
nodes = []
for w in weight_dict:
return nodes
def make_huffman_tree(weight_dict):
"""Constructs a huffman tree using the huffman algorithm"""
nodes = _weight_dict_to_tree_nodes(weight_dict)
return _build(nodes)
def _build(nodes):
if len(nodes) == 1:
#bottom out if only one node left
return nodes[0]
#otherwise sort the list, take the bottom two elements and combine
nodes.sort(key = lambda x: x.weight, reverse=True)
n1 = nodes.pop()
n2 = nodes.pop()
nodes.append(_combine(n2, n1))
return _build(nodes)
def flatten_to_dict(tree, codeword="", code_dict={}):
"""Traverses the tree, constructing a dict with the code words"""
#bottom out - if symbol exists then the node is a leaf
if tree.symbol:
if codeword == "":
#if there is no codeword passed to it yet, then source is 1 symbol
codeword = "0"
#by the time it gets to a leaf, codeword is constructed & can be added
code_dict[tree.symbol] = codeword
flatten_to_dict(, codeword+"0", code_dict)
flatten_to_dict(, codeword+"1", code_dict)
return code_dict
def huffman_encode(message, coding, k=1):
"""Simply encodes a message to its huffman encoding provided."""
code_message = []
for i in chunker(message, k):
#fill end of message with EOT symbols
while not len(i) == k:
i += EOT
return "".join(code_message)
def huffman_decode(code_message, hufftree):
"""Takes an encoded message and traverses the code tree to decode it"""
t = hufftree
for i in code_message:
if i == "0":
t =
elif i == "1":
t =
raise Exception("Code_message not binary")
if t.symbol:
t = hufftree
return "".join(decoded)
def calculate_average_length(coding, weights, k=1):
"""Calculates the average length of a coding
coding -- a dictionary of source alphabet to encoded values
weights -- a dictionary of source alphabet to normalised weights
#sum of { Prob(source letter i) * Length(encoded letter i) }
return (sum([len(coding[i])*weights[i] for i in coding]))/k
def calculate_entropy(weights):
"""Calculates the entropy of a coding
weights -- a dictionary of source alphabet to normalised weights
#negative sum of { Prob(source letter i) * log_2(Prob(source letter i)) }
return -sum([weights[i]*math.log(weights[i],2) for i in weights])
def _stdin_absint(string):
"""Helper function to take an absolute integer value from the user"""
x = None
while not x:
x = int(raw_input(string))
x = abs(x)
except ValueError:
print "Please enter an integer"
return x
#Helper method taken from
def chunker(seq, size):
"""Iterates in chunks"""
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
def main():
#Start off by getting initial weights from the user
weights = None
print "Enter in the source alphabet and weightings, stop by entering a "\
"blank source letter."
while not weights:
weights = get_source_weights()
tree = make_huffman_tree(weights)
coding = flatten_to_dict(tree)
print "Source alphabet:\n" + str(weights)
entropy = calculate_entropy(weights)
print "Entropy of the source: " + str(entropy)
print "Huffman Encoding: " + str(coding)
avlength = calculate_average_length(coding, weights)
print "Average length of codeword: " + str(avlength)
print "Difference betweeen entropy and avlength: " + str(avlength-entropy)
message_length = _stdin_absint("Enter a length for a message: ")
message = make_source_message(weights, message_length)
print "Source message:\n" + message
encoded = huffman_encode(message, coding)
print "Encoded message:\n" + encoded
print "Encoded length: "+ str(len(encoded))
#NB the message may not be an accurate representation of probabilities
#so the encoding may be more compressed than entropy would allow
print "Ratio difference from theoretical entropy optimum: "\
+ str(float(len(encoded)) / float(message_length*entropy))
#(somewhat) useless but fun info!
print "Compression over ascii: "\
+ str(float(len(encoded)) / float(message_length*7))
k = _stdin_absint("Enter a value to extend the coding by: ")
kweights = extend(k, weights)
ktree = make_huffman_tree(kweights)
#need to reset the internal code_dict -- a little messy
kcoding = {}
kcoding = flatten_to_dict(ktree, code_dict=kcoding)
kavlength = calculate_average_length(kcoding, kweights, k)
print "New average length of codeword with k-extension of alphabet: "\
+ str(kavlength)
print "Difference between entropy and avlength: " + str(kavlength-entropy)
kencoded = huffman_encode(message,kcoding,k)
print "New encoding from k-extension:\n" + kencoded
print "New encoding length: "+ str(len(kencoded))
print "Difference from without the k-extension: "\
print "No. of redundant characters: "+str(len(message) % k)
print "Compression ratio from no extension: "\
kdecoded = huffman_decode(kencoded,ktree)
print "Decoded k-extended string:\n" + kdecoded.rstrip(EOT)
if __name__ == "__main__":
