Skip to content

Instantly share code, notes, and snippets.

@Higgs1
Last active July 13, 2016 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Higgs1/c0066abd1cc2f2f5435d to your computer and use it in GitHub Desktop.
Save Higgs1/c0066abd1cc2f2f5435d to your computer and use it in GitHub Desktop.
My current Python Data Stream Reader
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from functools import partialmethod
import struct, math, io
def _boc(end):
if end is 'little':
return '<'
elif end is 'big':
return '>'
return '='
def _rbitshift(val, bits):
return val >> bits, val & (2**bits - 1)
__all__ = ['DataReader']
# TODO: make more generalized. Signing method should be
# completely independent of number format.
class DataReader(io.BufferedReader):
def __init__(self, raw, byteorder = None):
super().__init__(raw)
self.byteorder = byteorder or 'little'
@classmethod
def open(cls, file, byteorder = None, *args, **kwargs):
return cls(open(file, mode = 'rb', *args, **kwargs), byteorder)
@classmethod
def from_bytes(cls, bytes, byteorder = None):
if isinstance(bytes, str):
bytes = bytes.encode('iso-8859-1')
return cls(io.BytesIO(bytes), byteorder)
def read(self, n = -1):
bytes = super().read(n)
if n and not bytes:
raise StopIteration
return bytes
def unpack(self, fmt, byteorder = None):
fmt = _boc(byteorder or self.byteorder) + fmt
ret = struct.unpack(fmt, self.read(struct.calcsize(fmt)))
return ret[0] if len(ret) is 1 else ret
# Fixed length IEEE 754 number decoding methods.
# TODO: move logic to a separate IEEE 754 class.
# TODO: fully test if works.
def read_ieee754(self, size = 4, expbits = None, byteorder = None, *,
signed = True, expbias = None):
if not expbits:
expbits = max(0, round(4 * math.log(size, 2)) - 1)
if not expbias:
expbias = 2**(expbits - 1) - 1
fracbits = size * 8 - expbits - signed
raw, frac = _rbitshift(self.read_uint(size, byteorder), fracbits)
raw, exp = _rbitshift(raw, expbits)
frac *= 2**-fracbits
if exp == 0: # Denormal numbers
val = frac * 2**(1 - expbias)
elif exp + 1 == 2**expbits: # Infinity / NaN
val = float('nan' if frac else 'inf')
else: # Normailized value
val = (1 + frac) * 2**(exp - expbias)
return -val if signed & raw else val
read_half = partialmethod(read_ieee754, 2, 5)
read_float = partialmethod(read_ieee754, 4, 8)
read_double = partialmethod(read_ieee754, 8)
# Fixed length base 256 number decoding methods.
def read_uint(self, size = 4, byteorder = None, *, signed = False):
return int.from_bytes(self.read(size), signed = signed,
byteorder = byteorder or self.byteorder)
read_ubyte = partialmethod(read_uint, 1)
read_ushort = partialmethod(read_uint, 2)
read_ulong = partialmethod(read_uint, 8)
read_int = partialmethod(read_uint, signed = True)
read_byte = partialmethod(read_int, 1)
read_short = partialmethod(read_int, 2)
read_long = partialmethod(read_int, 8)
# String decoding methods.
# TODO: generalize into 'read until str' method?
def read_cstr(self, byteorder = None):
buf = self.read(1)
while buf[-1] != 0:
buf += self.read(1)
return buf[:-1]
def read_pstr(self, size = 2, byteorder = None):
return self.read(self.read_uint(size, byteorder))
# Variable length base 128 number decoding methods.
# TODO: implement maxsize?
# Maxsize = if a number reaches maxsize length,
# then the last byte is fully read- no need to check high bit.
# TODO: implement minsize?
# Minsize = like maxsize, the first minsize-1 bytes are fully read.
def read_uintvar(self, byteorder = None, *, signed = False):
byteorder = byteorder or self.byteorder
byte = self.read_ubyte()
if signed:
val = byte & 63
signed = byte & 64
else:
val = byte & 127
if byteorder is 'little':
pos = 1
while byte & 128:
byte = self.read_ubyte()
val += (byte & 127) << (pos * 7)
pos += 1
elif byteorder is 'big':
while byte & 128:
byte = self.read_ubyte()
val = (val << 7) + (byte & 127)
return val * -1 if signed else val
read_intvar = partialmethod(read_uintvar, signed = True)
read_leb128 = partialmethod(read_intvar, 'little')
read_uleb128 = partialmethod(read_uintvar, 'little')
read_uvlq = partialmethod(read_uintvar, 'big')
read_vlq = partialmethod(read_uvlq, signed = True)
@Higgs1
Copy link
Author

Higgs1 commented Jun 24, 2015

A fairly general purpose data reader for python. It's what I currently use for various things. I plan on making it more generalized eventually!
See https://docs.google.com/document/d/1qRY02Hoj1xAS-12rW5FTAh0UMRMHf-v3QYlvRsBI5ig for my current thoughts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment