Created
April 23, 2012 23:04
-
-
Save agfor/2474421 to your computer and use it in GitHub Desktop.
A Huffman Coder / decoder written as a sandbox to experiment with hybrid OO and functional code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import with_statement | |
from heapq import heapify, heappop, heappushpop | |
from itertools import count | |
from mmap import mmap | |
try: | |
from collections import Counter | |
except: | |
def Counter(iterable): | |
frequencies = __import__('collections').defaultdict(int) | |
for item in iterable: | |
frequencies[item] += 1 | |
return frequencies | |
class mmapfile(object): | |
"""Opens a file and mmaps it""" | |
def __init__(self, filename): | |
self.filename = filename | |
def __enter__(self): | |
self.infile = open(self.filename, 'r+b') | |
self.indata = mmap(self.infile.fileno(), 0) | |
return self.indata | |
def __exit__(self, *args): | |
self.indata.close() | |
self.infile.close() | |
class BitWriter(object): | |
"""Accepts bitstrings and writes to a file-like object""" | |
def __init__(self, outfile, codes = None): | |
self.outfile, self.codes, self.remainder = outfile, codes, '' | |
@staticmethod | |
def tobytes(bits): | |
return (chr(int(bits[i:i+8], 2)) for i in range(0, len(bits), 8)) | |
def write(self, bits): | |
bits, self.remainder = self.remainder + bits, '' | |
bytes = len(bits) // 8 | |
if not bytes: | |
self.remainder = bits | |
return | |
fullbits = bytes * 8 | |
self.remainder, bits = bits[fullbits:], bits[:fullbits] | |
self.outfile.write(''.join(self.tobytes(bits))) | |
def bitpad(self): | |
"""Smart bitpadder - Uses a sequence that isn't a valid code to bitpad | |
This is an alternative to a prepended bit signifying padding | |
It doesn't require the reader know anything about the bitpadding""" | |
tail, end = -len(self.remainder) % 8, '' | |
if self.codes: | |
for end in (bin(i)[2:].zfill(tail) for i in range(2**tail)): | |
if all(not end.startswith(code) for code in self.codes): | |
break | |
if tail and not end: | |
print "Possible code collision with zero padding" | |
self.write(end or '0' * tail) | |
class BitReader(object): | |
"""Reads from a file-like object and returns bitstrings""" | |
def __init__(self, infile): | |
self.infile, self.remainder = infile, '' | |
def __iter__(self): | |
return self | |
def next(self): | |
return self.read(1) | |
@staticmethod | |
def tobits(bytestring): | |
for c in bytestring: | |
yield bin(ord(c))[2:].zfill(8) | |
def read(self, bits=None): | |
remainder = self.remainder | |
if bits is None: | |
self.remainder, data = '', remainder | |
return data + ''.join(self.tobits(self.infile.read())) | |
rlen = len(remainder) | |
if bits <= rlen: | |
self.remainder, data = remainder[bits:], remainder[:bits] | |
return data | |
bits -= rlen | |
numbytes, extra = divmod(bits, 8) | |
data = ''.join(self.tobits(self.infile.read(numbytes + (not not extra)))) | |
self.remainder, data = data[bits:], remainder + data[:bits] | |
if not data: | |
raise StopIteration | |
return data | |
class BitFileWriter(BitWriter): | |
"""BitWriter that opens and closes its file and bitpads itself""" | |
def __init__(self, filename, codes = None): | |
self.filename, self.codes, self.remainder = filename, codes, '' | |
def open(self): | |
self.outfile = open(self.filename, 'wb') | |
return self | |
def close(self, *args): | |
self.bitpad() | |
self.outfile.close() | |
__enter__ = open | |
__exit__ = close | |
class BitFileReader(BitReader): | |
"""BitReader that opens and closes its file""" | |
def __init__(self, filename): | |
self.filename, self.remainder = filename, '' | |
def open(self): | |
self.infile = open(self.filename, 'r+b') | |
return self | |
def close(self, *args): | |
self.infile.close() | |
__enter__ = open | |
__exit__ = close | |
class Table(dict): | |
"""A table mapping bytes to bitstring codes or bitstring codes to bytes | |
It doesn't use shelve because shelve doesn't support iteration | |
It doesn't use csv because csv can't handle nul bytes""" | |
def flip(table): | |
"""Flip a table, works as the values are unique""" | |
return Table((value, key) for key, value in table.iteritems()) | |
def isvaluetable(table): | |
"""Determing if a table is a valuetable or codetable""" | |
return all(len(key) == 1 for key in table) | |
def dump(table, tablefilename, code): | |
"""Dump a table to a file""" | |
outtable = table.flip() if code == table.isvaluetable() else table | |
with open(tablefilename, 'wb') as tablefile: | |
for key, value in outtable.iteritems(): | |
tablefile.writelines((key, '\n', value, '\n')) | |
return table | |
@staticmethod | |
def load(codetablefilename, code): | |
"""Load a table from a file, correctly interpreting newlines""" | |
with open(codetablefilename, 'rb') as codetablefile: | |
table, key, newline = Table(), False, False | |
for value in codetablefile: | |
if value != '\n' or not newline: | |
if value == '\n': | |
newline = True | |
if key: | |
table[key[:-1] or '\n'] = value[:-1] or '\n' | |
key = False if key else value | |
return table.flip() if code == table.isvaluetable() else table | |
class Coder(str): | |
index = count().next | |
@classmethod | |
def codetree(coder, node): | |
"""Huffman codes a node""" | |
for i, child in enumerate(node[1:3]): | |
child[3] = node[3] + str(i) | |
if isinstance(child[1], list): | |
coder.codetree(child) | |
@classmethod | |
def code(coder, indata): | |
"""Creates a mapping of byte to code by | |
1. generating a heap of Nodes | |
2. turning them into a tree by frequency | |
3. recursively coding the tree | |
""" | |
heap = [[freq, coder.index(), value, ''] for value, freq in Counter(indata).iteritems()] | |
heapify(heap) | |
tempheap = heap[:] | |
head = heappop(tempheap) | |
while tempheap: | |
temp = heappop(tempheap) | |
head = heappushpop(tempheap, [head[0] + temp[0], head, temp, '']) | |
coder.codetree(head) | |
return Table(n[2:] for n in heap) | |
def codefile(filename): | |
"""Open a file, mmap it, and code it""" | |
with mmapfile(filename) as indata: | |
return filename.code(indata) | |
def writecodefile(filename): | |
"""Code a file and also save the codetable""" | |
return filename.codefile().dump(filename + '.codetable', True) | |
class Encoder(Table): | |
def encode(valuetable, indata): | |
"""Encode a string-like object by looking up each byte in a table""" | |
for byte in indata: | |
yield valuetable[byte] | |
def encodefile(valuetable, filename): | |
"""Open a file, mmap it, and use a valuetable to encode it""" | |
with mmapfile(filename) as indata: | |
for code in valuetable.encode(indata): | |
yield code | |
def writeencodefile(valuetable, filename, encodedfilename): | |
"""Encode a file and save the encoded version""" | |
with BitFileWriter(encodedfilename, valuetable.viewvalues()) as writer: | |
for code in valuetable.encodefile(filename): | |
writer.write(code) | |
class Decoder(Table): | |
def decode(codetable, codedata, code=''): | |
"""Decode a bitstring-like object by reading bits until a code is found | |
then yielding the corresponding value and repeating""" | |
for bit in codedata: | |
code += bit | |
if code in codetable: | |
yield codetable[code] | |
code = '' | |
def decodefile(codetable, codefilename): | |
"""Open an encoded file, mmap it, and use a codetable to decode it""" | |
with BitFileReader(codefilename) as codedata: | |
for code in codetable.decode(codedata): | |
yield code | |
def writedecodefile(codetable, codefilename, decodedfilename): | |
"""Decode an encoded file and save the decoded version""" | |
with open(decodedfilename, 'wb') as decodedfile: | |
for byte in codetable.decodefile(codefilename): | |
decodedfile.write(byte) | |
class HuffmanFile(str): | |
def compress(filename): | |
"""Code a file and save its codetable and encoded version""" | |
Encoder(Coder(filename).writecodefile()).writeencodefile(filename, filename + '.encoded') | |
def decompress(filename): | |
"""Load an encoded file and its codetable and save the decoded version""" | |
Decoder(Table.load(filename + '.codetable', True)).writedecodefile(filename + '.encoded', filename + '.decoded') | |
# Not included: Counter, mmapfile, test | |
__all__ = ['Huffman', 'Decoder', 'Encoder', 'Coder', 'Table', | |
'BitFileReader', 'BitFileWriter', 'BitReader', 'BitWriter'] | |
def test(): | |
from os.path import split, join | |
huffmanfile = HuffmanFile(__file__) | |
huffmanfile.compress() | |
huffmanfile.decompress() | |
with open(__file__, 'rb') as infile, open(__file__ + '.decoded', 'rb') as outfile: | |
print infile.read() == outfile.read() | |
print __file__, "successfully compressed and decompressed." | |
if __name__ == '__main__': | |
#__import__('cProfile').run("test()") | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment