Skip to content

Instantly share code, notes, and snippets.

@agfor
Created April 23, 2012 23:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save agfor/2474421 to your computer and use it in GitHub Desktop.
Save agfor/2474421 to your computer and use it in GitHub Desktop.
A Huffman Coder / decoder written as a sandbox to experiment with hybrid OO and functional code
from __future__ import with_statement
from heapq import heapify, heappop, heappushpop
from itertools import count
from mmap import mmap
try:
from collections import Counter
except:
def Counter(iterable):
frequencies = __import__('collections').defaultdict(int)
for item in iterable:
frequencies[item] += 1
return frequencies
class mmapfile(object):
"""Opens a file and mmaps it"""
def __init__(self, filename):
self.filename = filename
def __enter__(self):
self.infile = open(self.filename, 'r+b')
self.indata = mmap(self.infile.fileno(), 0)
return self.indata
def __exit__(self, *args):
self.indata.close()
self.infile.close()
class BitWriter(object):
"""Accepts bitstrings and writes to a file-like object"""
def __init__(self, outfile, codes = None):
self.outfile, self.codes, self.remainder = outfile, codes, ''
@staticmethod
def tobytes(bits):
return (chr(int(bits[i:i+8], 2)) for i in range(0, len(bits), 8))
def write(self, bits):
bits, self.remainder = self.remainder + bits, ''
bytes = len(bits) // 8
if not bytes:
self.remainder = bits
return
fullbits = bytes * 8
self.remainder, bits = bits[fullbits:], bits[:fullbits]
self.outfile.write(''.join(self.tobytes(bits)))
def bitpad(self):
"""Smart bitpadder - Uses a sequence that isn't a valid code to bitpad
This is an alternative to a prepended bit signifying padding
It doesn't require the reader know anything about the bitpadding"""
tail, end = -len(self.remainder) % 8, ''
if self.codes:
for end in (bin(i)[2:].zfill(tail) for i in range(2**tail)):
if all(not end.startswith(code) for code in self.codes):
break
if tail and not end:
print "Possible code collision with zero padding"
self.write(end or '0' * tail)
class BitReader(object):
"""Reads from a file-like object and returns bitstrings"""
def __init__(self, infile):
self.infile, self.remainder = infile, ''
def __iter__(self):
return self
def next(self):
return self.read(1)
@staticmethod
def tobits(bytestring):
for c in bytestring:
yield bin(ord(c))[2:].zfill(8)
def read(self, bits=None):
remainder = self.remainder
if bits is None:
self.remainder, data = '', remainder
return data + ''.join(self.tobits(self.infile.read()))
rlen = len(remainder)
if bits <= rlen:
self.remainder, data = remainder[bits:], remainder[:bits]
return data
bits -= rlen
numbytes, extra = divmod(bits, 8)
data = ''.join(self.tobits(self.infile.read(numbytes + (not not extra))))
self.remainder, data = data[bits:], remainder + data[:bits]
if not data:
raise StopIteration
return data
class BitFileWriter(BitWriter):
"""BitWriter that opens and closes its file and bitpads itself"""
def __init__(self, filename, codes = None):
self.filename, self.codes, self.remainder = filename, codes, ''
def open(self):
self.outfile = open(self.filename, 'wb')
return self
def close(self, *args):
self.bitpad()
self.outfile.close()
__enter__ = open
__exit__ = close
class BitFileReader(BitReader):
"""BitReader that opens and closes its file"""
def __init__(self, filename):
self.filename, self.remainder = filename, ''
def open(self):
self.infile = open(self.filename, 'r+b')
return self
def close(self, *args):
self.infile.close()
__enter__ = open
__exit__ = close
class Table(dict):
"""A table mapping bytes to bitstring codes or bitstring codes to bytes
It doesn't use shelve because shelve doesn't support iteration
It doesn't use csv because csv can't handle nul bytes"""
def flip(table):
"""Flip a table, works as the values are unique"""
return Table((value, key) for key, value in table.iteritems())
def isvaluetable(table):
"""Determing if a table is a valuetable or codetable"""
return all(len(key) == 1 for key in table)
def dump(table, tablefilename, code):
"""Dump a table to a file"""
outtable = table.flip() if code == table.isvaluetable() else table
with open(tablefilename, 'wb') as tablefile:
for key, value in outtable.iteritems():
tablefile.writelines((key, '\n', value, '\n'))
return table
@staticmethod
def load(codetablefilename, code):
"""Load a table from a file, correctly interpreting newlines"""
with open(codetablefilename, 'rb') as codetablefile:
table, key, newline = Table(), False, False
for value in codetablefile:
if value != '\n' or not newline:
if value == '\n':
newline = True
if key:
table[key[:-1] or '\n'] = value[:-1] or '\n'
key = False if key else value
return table.flip() if code == table.isvaluetable() else table
class Coder(str):
index = count().next
@classmethod
def codetree(coder, node):
"""Huffman codes a node"""
for i, child in enumerate(node[1:3]):
child[3] = node[3] + str(i)
if isinstance(child[1], list):
coder.codetree(child)
@classmethod
def code(coder, indata):
"""Creates a mapping of byte to code by
1. generating a heap of Nodes
2. turning them into a tree by frequency
3. recursively coding the tree
"""
heap = [[freq, coder.index(), value, ''] for value, freq in Counter(indata).iteritems()]
heapify(heap)
tempheap = heap[:]
head = heappop(tempheap)
while tempheap:
temp = heappop(tempheap)
head = heappushpop(tempheap, [head[0] + temp[0], head, temp, ''])
coder.codetree(head)
return Table(n[2:] for n in heap)
def codefile(filename):
"""Open a file, mmap it, and code it"""
with mmapfile(filename) as indata:
return filename.code(indata)
def writecodefile(filename):
"""Code a file and also save the codetable"""
return filename.codefile().dump(filename + '.codetable', True)
class Encoder(Table):
def encode(valuetable, indata):
"""Encode a string-like object by looking up each byte in a table"""
for byte in indata:
yield valuetable[byte]
def encodefile(valuetable, filename):
"""Open a file, mmap it, and use a valuetable to encode it"""
with mmapfile(filename) as indata:
for code in valuetable.encode(indata):
yield code
def writeencodefile(valuetable, filename, encodedfilename):
"""Encode a file and save the encoded version"""
with BitFileWriter(encodedfilename, valuetable.viewvalues()) as writer:
for code in valuetable.encodefile(filename):
writer.write(code)
class Decoder(Table):
def decode(codetable, codedata, code=''):
"""Decode a bitstring-like object by reading bits until a code is found
then yielding the corresponding value and repeating"""
for bit in codedata:
code += bit
if code in codetable:
yield codetable[code]
code = ''
def decodefile(codetable, codefilename):
"""Open an encoded file, mmap it, and use a codetable to decode it"""
with BitFileReader(codefilename) as codedata:
for code in codetable.decode(codedata):
yield code
def writedecodefile(codetable, codefilename, decodedfilename):
"""Decode an encoded file and save the decoded version"""
with open(decodedfilename, 'wb') as decodedfile:
for byte in codetable.decodefile(codefilename):
decodedfile.write(byte)
class HuffmanFile(str):
def compress(filename):
"""Code a file and save its codetable and encoded version"""
Encoder(Coder(filename).writecodefile()).writeencodefile(filename, filename + '.encoded')
def decompress(filename):
"""Load an encoded file and its codetable and save the decoded version"""
Decoder(Table.load(filename + '.codetable', True)).writedecodefile(filename + '.encoded', filename + '.decoded')
# Not included: Counter, mmapfile, test
__all__ = ['Huffman', 'Decoder', 'Encoder', 'Coder', 'Table',
'BitFileReader', 'BitFileWriter', 'BitReader', 'BitWriter']
def test():
from os.path import split, join
huffmanfile = HuffmanFile(__file__)
huffmanfile.compress()
huffmanfile.decompress()
with open(__file__, 'rb') as infile, open(__file__ + '.decoded', 'rb') as outfile:
print infile.read() == outfile.read()
print __file__, "successfully compressed and decompressed."
if __name__ == '__main__':
#__import__('cProfile').run("test()")
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment