Skip to content

Instantly share code, notes, and snippets.

@nwalker
Created March 24, 2013 19:42
Show Gist options
  • Save nwalker/5233215 to your computer and use it in GitHub Desktop.
Save nwalker/5233215 to your computer and use it in GitHub Desktop.
Чем лучше заменить длинные свитчи? В _read так и просится dict { tag: handler }, но есть нюанс - его неудобно расширять в сабклассе. Что делать с _write(особенно, см. #195)?
ettAtom = 'd'
# ettAtomUTF8 = 'v' # this is beyond retarded
# ettBinary = 'm'
# ettBitBinary = 'M'
# ettCachedAtom = 'C'
# ettCacheRef = 'R'
# ettExport = 'q'
ettFloat = 'c'
# ettFun = 'u'
ettInteger = 'b'
# ettLargeBig = 'o'
ettLargeTuple = 'i'
ettList = 'l'
# ettNewCache = 'N'
# ettNewFloat = 'F'
# ettNewFun = 'p'
ettNewRef = 'r'
ettNil = 'j'
ettPid = 'g'
# ettPort = 'f'
ettRef = 'e'
ettSmallAtom = 's'
# ettSmallAtomUTF8 = 'w' # this is beyond retarded
# ettSmallBig = 'n'
ettSmallInteger = 'a'
ettSmallTuple = 'h'
ettString = 'k'
class Atom(str):
def __repr__(self):
return '<atom: %s>' % self
class UAtom(unicode):
def __repr__(self):
return '<atom_utf: %s>' % self
AtomTypes = (Atom, UAtom)
class String(str):
def as_list(self):
return [ord(x) for x in self]
def __repr__(self):
return '<etf_string: %s%r>' % (self, self.as_list())
class Ref(object):
def __init__(self, node, ids, creation):
self.node = node
self.ids = ids
self.creation = creation
def __repr__(self):
return '<Ref: %s,id=%d>' % (self.node, self.ids[0])
class Pid(object):
def __init__(self, node, id, serial, creation):
self._node = node
self._id = id
self._serial = serial
self._creation = creation
self._hash = hash((node, id, serial, creation))
@property
def node(self):
return self._node
def __repr__(self):
return '<Pid: <%s.%d.%d>>' % (self._node, self._id, self._serial)
def __eq__(self, other):
return type(other) is Pid and hash(other) == hash(self)
def __hash__(self):
return self._hash
class StupidETFContext(object):
def read_terms(self, buf):
terms = []
while not buf.eof():
term_tag = buf.get('!B')
if term_tag != 131:
raise ValueError('wrong term_tag=%d at %d' %
(term_tag, buf.offset))
terms.append(self._read(buf))
return terms
def _read(self, buf):
get = buf.get
tag = get('!c')
if tag is ettSmallInteger:
term = get('!B')
elif tag is ettInteger:
term = get('!i')
elif tag is ettFloat:
_term = get('!31s')
term = float(_term.rstrip('\x00'))
elif tag is ettAtom:
_len = get('!H')
_term = buf.raw(_len)
term = Atom(_term)
elif tag is ettSmallTuple or tag is ettLargeTuple:
arity = get('!B') if tag is ettSmallTuple else get('!I')
_term = []
for _ in xrange(arity):
t = self._read(buf)
_term.append(t)
term = tuple(_term)
elif tag is ettString:
_len = get('!H')
term = String(buf.raw(_len))
elif tag is ettList:
arity = get('!I')
term = []
for _ in range(arity):
term.append(self._read(buf))
self._read(buf) # tail, dropped
elif tag is ettRef:
node = self._read(buf)
id, creation = get('!IB')
term = Ref(node, [id], creation)
elif tag is ettNewRef:
_len = get('!H')
node = self._read(buf)
creation = get('!B')
ids = get('!%dI' % _len, cache=False)
term = Ref(node, ids, creation)
elif tag is ettPid:
node = self._read(buf)
id, serial, creation = get('!IIB')
term = Pid(node, id, serial, creation)
elif tag is ettNil:
term = []
else:
raise ValueError('unhandled tag=%s(%d) at %d' %
(tag, ord(tag), buf.offset))
return term
def write_terms(self, terms, buf):
for term in terms:
buf.put('!B', 131) # term tag
self._write(term, buf)
def _write(self, item, buf):
put = buf.put
if isinstance(item, int):
if 0 <= item < 256:
put('!cB', ettSmallInteger, item)
else:
put('!i', item)
elif isinstance(item, float):
data = '%.20e' % item
put('!c', ettFloat)
buf.raw(data)
buf.raw(chr(0) * (31 - len(data))) # pad to 31 byte
elif isinstance(item, Atom):
put('!cH', ettAtom, len(item))
buf.raw(item)
elif isinstance(item, Pid):
put('!c', ettPid)
self._write(Atom(item._node), buf)
put('!IIB', item._id, item._serial, item._creation)
elif isinstance(item, Ref):
_len = len(item.ids)
put('!cH', ettNewRef, _len)
self._write(Atom(item.node), buf)
put('!B', item.creation)
put('!%sI' % _len, *item.ids)
elif isinstance(item, tuple):
_len = len(item)
if _len > 255:
put('!cB', ettSmallTuple, _len)
else:
put('!cI', ettLargeTuple, _len)
for i in item:
self._write(i, buf)
elif item == []:
put('!c', ettNil)
elif isinstance(item, list):
put('!cI', ettList, len(item))
for i in item:
self._write(i, buf)
self._write([], buf)
else:
raise ValueError('cannot encode value of type %r' % type(item))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment