Created
November 14, 2009 23:55
-
-
Save dsc/234867 to your computer and use it in GitHub Desktop.
BitString - A file-like class for writing bits
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" BitString - A file-like object for reading/writing bits. | |
""" | |
try: | |
from cStringIO import StringIO | |
except: | |
from StringIO import StringIO | |
def binlen(n): | |
"Number of bits needed to represent the absolute value of n." | |
return len(bin(abs(n))) - 2 | |
def mask(n): | |
"Returns a run of 1s of size n." | |
return (1 << n) - 1 | |
class EndOfBitfield(IOError): | |
"Raised when attempting to read at the end of the bitfield." | |
class BitString(object): | |
""" File-like object for reading/writing bits. | |
""" | |
buf = None | |
_spill = 0 | |
"Spill cache for bits smaller than a byte waiting to write." | |
_spillen = 0 | |
"Number of bits in the spill cache." | |
_peek = 0 | |
"Peek cache for read requests smaller than a byte." | |
_peeklen = 0 | |
"Number of bits in the peek cache." | |
def __init__(self, source='', buf=None): | |
self.buf = buf or StringIO() | |
if source: | |
self.buf.write(source) | |
def writebits(self, n): | |
""" Writes bits to the stream; bits must be supplied as a number. (Supplying 0 will write one bit.) | |
""" | |
size = binlen(n) | |
bits = (self._spill << size) | n | |
size += self._spillen # handles _spill=0 but _spillen > 0 | |
while size >= 8: | |
size -= 8 | |
b = bits >> size | |
bits &= mask(size) | |
self.buf.write(chr(b)) | |
self._spill = bits | |
self._spillen = size | |
def flush(self): | |
"Flushes any pending bits to the stream." | |
b = self._spill | |
if self._spillen: | |
b <<= 8 - self._spillen | |
self.buf.write(chr(b)) | |
self._spill = 0 | |
self._spillen = 0 | |
def truncate(self): | |
"Truncates the stream to zero bits." | |
self.buf.truncate() | |
self._spill = 0 | |
self._spillen = 0 | |
self._peek = 0 | |
self._peeklen = 0 | |
def seek(self, n, mode=0): | |
"Flushes the bit-buffer and moves to the given byte-offset. mode: 0 = absolute, 1 = relative, 2 = relative EOF" | |
self.flush() | |
self._peek = 0 | |
self._peeklen = 0 | |
self.buf.seek(n, mode) | |
def tell(self): | |
"Returns the current position of the cursor as a byte offset from the start of the stream." | |
return self.buf.tell() | |
def readbits(self, n): | |
"Reads n bits from the stream." | |
if n == 0: | |
return 0 | |
size = self._peeklen | |
bits = self._peek | |
while size < n: | |
byte = self.buf.read(1) | |
if not byte: | |
break | |
size += 8 | |
bits = (bits << 8) | ord(byte) | |
if size > n: | |
self._peeklen = size - n | |
self._peek = bits & mask(self._peeklen) | |
bits >>= self._peeklen | |
else: | |
self._peeklen = 0 | |
self._peek = 0 | |
if size == 0: | |
raise EndOfBitfield('Cannot read from end of stream') | |
return bits | |
def peek(self, n): | |
"Reads the next n bits without moving the cursor." | |
offset = 0 | |
size = self._peeklen | |
bits = self._peek | |
while size < n: | |
byte = self.buf.read(1) | |
if not byte: | |
break | |
offset += 1 | |
size += 8 | |
bits = (bits << 8) | ord(byte) | |
if size == 0: | |
raise EndOfBitfield('Cannot read from end of stream') | |
if size > n: | |
bits >>= size - n | |
if offset: | |
self.buf.seek(-offset, 1) | |
return bits | |
def iterbits(self, n=1): | |
"Iterate over the field, n bits at a time." | |
try: | |
while True: | |
yield self.readbits(n) | |
except EndOfBitfield: | |
pass | |
def bin(self): | |
"Dumps the stream as a binary string." | |
self.flush() | |
pos = self.buf.tell() | |
self.buf.seek(0) | |
s = '0b' + ''.join( str(b) for b in self.iterbits(1) ) | |
self.buf.seek(pos) | |
return s | |
def dump(self): | |
"Dumps the stream as a string; flushes the bit-buffer but leaves cursor position unchanged." | |
self.flush() | |
pos = self.buf.tell() | |
self.buf.seek(0) | |
s = self.buf.read() | |
self.buf.seek(pos) | |
return s | |
def __repr__(self, dump_buf=True): | |
s = self.buf.getvalue() | |
return 'BitString(%s, spill[%s]=%s, tell=%r, peek[%s]=%s)' % ( | |
'buf=%r' % s if dump_buf else 'len(buf)=%r' % len(s), | |
self._spillen, bin(self._spill)[2:], | |
self.tell(), | |
self._peeklen, bin(self._peek)[2:] | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment