Skip to content

Instantly share code, notes, and snippets.

@dsc
Created November 14, 2009 23:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dsc/234867 to your computer and use it in GitHub Desktop.
Save dsc/234867 to your computer and use it in GitHub Desktop.
BitString - A file-like class for writing bits
""" BitString - A file-like object for reading/writing bits.
"""
try:
from cStringIO import StringIO
except:
from StringIO import StringIO
def binlen(n):
"Number of bits needed to represent the absolute value of n."
return len(bin(abs(n))) - 2
def mask(n):
"Returns a run of 1s of size n."
return (1 << n) - 1
class EndOfBitfield(IOError):
"Raised when attempting to read at the end of the bitfield."
class BitString(object):
""" File-like object for reading/writing bits.
"""
buf = None
_spill = 0
"Spill cache for bits smaller than a byte waiting to write."
_spillen = 0
"Number of bits in the spill cache."
_peek = 0
"Peek cache for read requests smaller than a byte."
_peeklen = 0
"Number of bits in the peek cache."
def __init__(self, source='', buf=None):
self.buf = buf or StringIO()
if source:
self.buf.write(source)
def writebits(self, n):
""" Writes bits to the stream; bits must be supplied as a number. (Supplying 0 will write one bit.)
"""
size = binlen(n)
bits = (self._spill << size) | n
size += self._spillen # handles _spill=0 but _spillen > 0
while size >= 8:
size -= 8
b = bits >> size
bits &= mask(size)
self.buf.write(chr(b))
self._spill = bits
self._spillen = size
def flush(self):
"Flushes any pending bits to the stream."
b = self._spill
if self._spillen:
b <<= 8 - self._spillen
self.buf.write(chr(b))
self._spill = 0
self._spillen = 0
def truncate(self):
"Truncates the stream to zero bits."
self.buf.truncate()
self._spill = 0
self._spillen = 0
self._peek = 0
self._peeklen = 0
def seek(self, n, mode=0):
"Flushes the bit-buffer and moves to the given byte-offset. mode: 0 = absolute, 1 = relative, 2 = relative EOF"
self.flush()
self._peek = 0
self._peeklen = 0
self.buf.seek(n, mode)
def tell(self):
"Returns the current position of the cursor as a byte offset from the start of the stream."
return self.buf.tell()
def readbits(self, n):
"Reads n bits from the stream."
if n == 0:
return 0
size = self._peeklen
bits = self._peek
while size < n:
byte = self.buf.read(1)
if not byte:
break
size += 8
bits = (bits << 8) | ord(byte)
if size > n:
self._peeklen = size - n
self._peek = bits & mask(self._peeklen)
bits >>= self._peeklen
else:
self._peeklen = 0
self._peek = 0
if size == 0:
raise EndOfBitfield('Cannot read from end of stream')
return bits
def peek(self, n):
"Reads the next n bits without moving the cursor."
offset = 0
size = self._peeklen
bits = self._peek
while size < n:
byte = self.buf.read(1)
if not byte:
break
offset += 1
size += 8
bits = (bits << 8) | ord(byte)
if size == 0:
raise EndOfBitfield('Cannot read from end of stream')
if size > n:
bits >>= size - n
if offset:
self.buf.seek(-offset, 1)
return bits
def iterbits(self, n=1):
"Iterate over the field, n bits at a time."
try:
while True:
yield self.readbits(n)
except EndOfBitfield:
pass
def bin(self):
"Dumps the stream as a binary string."
self.flush()
pos = self.buf.tell()
self.buf.seek(0)
s = '0b' + ''.join( str(b) for b in self.iterbits(1) )
self.buf.seek(pos)
return s
def dump(self):
"Dumps the stream as a string; flushes the bit-buffer but leaves cursor position unchanged."
self.flush()
pos = self.buf.tell()
self.buf.seek(0)
s = self.buf.read()
self.buf.seek(pos)
return s
def __repr__(self, dump_buf=True):
s = self.buf.getvalue()
return 'BitString(%s, spill[%s]=%s, tell=%r, peek[%s]=%s)' % (
'buf=%r' % s if dump_buf else 'len(buf)=%r' % len(s),
self._spillen, bin(self._spill)[2:],
self.tell(),
self._peeklen, bin(self._peek)[2:]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment