Skip to content

Instantly share code, notes, and snippets.

@AlexSnet
Last active August 29, 2015 14:01
Show Gist options
  • Save AlexSnet/1d2b0e2c9bedcc5713be to your computer and use it in GitHub Desktop.
Save AlexSnet/1d2b0e2c9bedcc5713be to your computer and use it in GitHub Desktop.
Binary structures in python
import struct
from collections import namedtuple
from collections import OrderedDict
from _abcoll import *
import _abcoll
from _collections import deque, defaultdict
from operaator import itemgetter as _itemgetter, eq as _eq
from keyword import iskeyword as _iskeyword
import sys as _sys
import heapq as _heapq
from itertools import repeat as _repeat, chain as _chain, starmap as _starmap
from itertools import imap as _imap
try:
from thread import get_ident as _get_ident
except ImportError:
from dummy_thread import get_ident as _get_ident
_class_template = '''\
class {typename}(tuple):
'{typename}({arg_list})'
__slots__ = ()
_fields = {field_names!r}
_types = ({field_types},)
def __new__(_cls, {arg_list}):
'Create new instance of {typename}({arg_list})'
return _tuple.__new__(_cls, ({arg_list}))
def __repr__(self):
'Return a nicely formatted representation string'
return '{typename}({repr_fmt})' % self
def _asdict(self):
'Return a new OrderedDict which maps field names to their values'
return OrderedDict(zip(self._fields, self))
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return tuple(self)
__dict__ = _property(_asdict)
def _replace(_self, **kwds):
'Return a new {typename} object replacing specified fields with new values'
result = _self._make(map(kwds.pop, {field_names!r}, _self))
if kwds:
raise ValueError('Got unexpected field names: %r' % kwds.keys())
return result
def __getstate__(self):
'Exclude the OrderedDict from pickling'
pass
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new {typename} object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != {num_fields:d}:
raise TypeError('Expected {num_fields:d} arguments, got %d' % len(result))
return result
@classmethod
def bytesOf(cls, var):
'Returns bytes of given attribute'
pos = cls._fields.index(var)
if type(cls._types[pos]) == str:
return struct.calcsize(cls._types[pos])
else:
return cls._types[pos].bytes()
@classmethod
def bytes(cls):
'Return total number of bytes for all attributes'
total = 0
for i in cls._fields:
total += cls.bytesOf(i)
return total
@classmethod
def unpack(cls, fp):
'Unpacks structure from file pointer and returns new instance of it.'
d = dict()
for i in cls._fields:
pos = cls._fields.index(i)
if type(cls._types[pos]) in (str, unicode):
d[i] = struct.unpack(cls._types[pos], fp.read(cls.bytesOf(i)))
if cls._types[pos] in ('I', 'i', 'H', 'h', 'L', 'l', 'Q', 'q', 'P'):
d[i] = int(d[i][0])
elif cls._types[pos] in ('f','d'):
d[i] = float(d[i][0])
elif cls._types[pos] == '?':
d[i] = bool(d[i][0])
elif 'c' in cls._types[pos] or 's' in cls._types[pos]:
d[i] = r''.join(d[i])
else:
d[i] = cls._types[pos].unpack(fp)
if _mappers['in'][pos]:
d[i] = _mappers['in'][pos](d[i])
return cls(**d)
def pack(self, fp):
'Reverse to unpack'
for i in self._fields:
pos = self._fields.index(i)
if type(self._types[pos]) == str:
val = getattr(self, i)
if _mappers['out'][pos]:
val = _mappers['out'][pos](val)
try:
if len(self._types[pos]) > 1 and int(self._types[pos][0]):
for x in range(len(self._types[pos])):
fp.write( struct.pack(self._types[pos][x], val[x]) )
except:
pass
else:
fp.write( struct.pack(self._types[pos], val) )
else:
val = getattr(self, i)
val.pack(fp)
{field_defs}
'''
_repr_template = '{name}=%r'
_field_template = '''\
{name} = _property(_itemgetter({index:d}), doc='Alias for field number {index:d}')
'''
def bstruct(typename, field_names, field_types=None, field_map_in=None, field_map_out=None, verbose=False, rename=False):
"""
>>> from StringIO import StringIO
>>> A = bstruct('A', 'a b c d', 'c I cccc c')
>>> x = A.unpack(StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30'))
>>> print x
A(a='G', b=65792, c=' !"#', d='0')
>>> b = bstruct('B', 'a version', [A, 'i'])
>>> s = StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30\x01\x00\x00\x00')
>>> z = b.unpack(s)
>>> print z
B(a=A(a='G', b=65792, c=' !"#', d='0'), version=1)
>>> ss = StringIO()
>>> z.pack(ss)
>>> ss.seek(0)
>>> s.seek(0)
>>> s.read() == ss.read()
True
"""
field_map_in = [None]*len(field_names) if not field_map_in else field_map_in
field_map_out= [None]*len(field_names) if not field_map_out else field_map_out
if type(field_names) == tuple:
for i,x in enumerate(field_names):
field_map_in[i] = x[2] if len(x) > 2 else None
field_map_out[i] = x[3] if len(x) == 4 else None
field_types = [x[1] for x in field_names]
field_names = [x[0] for x in field_names]
# Validate the field names. At the user's option, either generate an error
# message or automatically replace the field name with a valid name.
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split()
field_names = map(str, field_names)
if isinstance(field_types, basestring):
field_types = field_types.replace(',', ' ').split()
# field_types = map(, field_types)
# print field_types
if len(field_names) != len(field_types):
raise Exception('Lenght of fields must be equal to lenght of field types.')
if rename:
seen = set()
for index, name in enumerate(field_names):
if (not all(c.isalnum() or c=='_' for c in name)
or _iskeyword(name)
or not name
or name[0].isdigit()
or name.startswith('_')
or name in seen):
field_names[index] = '_%d' % index
seen.add(name)
for name in [typename] + field_names:
if not all(c.isalnum() or c=='_' for c in name):
raise ValueError('Type names and field names can only contain '
'alphanumeric characters and underscores: %r' % name)
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a '
'keyword: %r' % name)
if name[0].isdigit():
raise ValueError('Type names and field names cannot start with '
'a number: %r' % name)
seen = set()
for name in field_names:
if name.startswith('_') and not rename:
raise ValueError('Field names cannot start with an underscore: '
'%r' % name)
if name in seen:
raise ValueError('Encountered duplicate field name: %r' % name)
seen.add(name)
# for index, name in enumerate(field_names):
# print index, name, field_types[index]
# Fill-in the class template
class_definition = _class_template.format(
typename = typename,
field_names = tuple(field_names),
field_types = ', '.join(['_%s'%x.__name__ if type(x)!=str else "'%s'"%x for x in field_types]),
num_fields = len(field_names),
arg_list = repr(tuple(field_names)).replace("'", "")[1:-1],
repr_fmt = ', '.join(_repr_template.format(name=name)
for name in field_names), # V
field_defs = '\n'.join(_field_template.format(index=index, name=name)
for index, name in enumerate(field_names))
)
if verbose:
print class_definition
# Execute the template string in a temporary namespace and support
# tracing utilities by setting a value for frame.f_globals['__name__']
namespace = dict(_itemgetter=_itemgetter, __name__='bstruct_%s' % typename,
OrderedDict=OrderedDict, _property=property, _tuple=tuple,
struct=struct, _mappers={'in':field_map_in, 'out':field_map_out})
for x in field_types:
if type(x)!=str:
namespace['_%s'%x.__name__] = x
try:
exec class_definition in namespace
except SyntaxError as e:
raise SyntaxError(e.message + ':\n' + class_definition)
result = namespace[typename]
# For pickling to work, the __module__ variable needs to be set to the frame
# where the named tuple is created. Bypass this step in enviroments where
# sys._getframe is not defined (Jython for example) or sys._getframe is not
# defined for arguments greater than 0 (IronPython).
try:
result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__')
except (AttributeError, ValueError):
pass
return result
if __name__ == '__main__':
from StringIO import StringIO
A = bstruct('A', 'a b c d', 'c I cccc c', verbose=True)
x = A.unpack(StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30'))
print x
b = bstruct('B', 'a version', [A, 'i'])
s = StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30\x01\x00\x00\x00')
z = b.unpack(s)
print z
ss = StringIO()
z.pack(ss)
ss.seek(0)
s.seek(0)
print 'Unpack test:', s.read() == ss.read()
Use c-structs as easy as in c.
The simple pure-python version.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment