Skip to content

Instantly share code, notes, and snippets.

@tonyg
Last active August 11, 2019 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonyg/af3c19f64a68d8a91c1fea7d181467f1 to your computer and use it in GitHub Desktop.
Save tonyg/af3c19f64a68d8a91c1fea7d181467f1 to your computer and use it in GitHub Desktop.
# Simple Canonical S-Expression codec.
# https://people.csail.mit.edu/rivest/Sexp.txt
# Apache v2 Licenced. Copyright 2018-2019 Tony Garnock-Jones
import base64
import zlib
try:
unicode
except NameError:
unicode = str
def decode(bs):
(v,tail) = decodestream(bs)
if tail != b'':
raise ValueError('Trailing junk in canonical S-expression')
return v
def decodestream(bs):
if bs[0:1] == b'(':
bs = bs[1:]
acc = []
while bs[0:1] != b')':
(v,bs) = decodestream(bs)
acc.append(v)
bs = bs[1:]
return (acc,bs)
else:
return _decodestr(bs)
def _decodestr(bs):
count = 0
while bs[count:count+1].isdigit():
count = count + 1
if bs[count:count+1] != b':':
raise ValueError("Invalid canonical S-expression")
strlen = int(bs[0:count])
bs = bs[count+1:]
return (bs[:strlen], bs[strlen:])
def encode(v):
if isinstance(v, bytes):
return str(len(v)).encode('us-ascii') + b':' + v
elif isinstance(v, list) or isinstance(v, tuple):
return b'(' + b''.join((encode(w) for w in v)) + b')'
else:
raise ValueError("Unsupported value encoding canonical S-expression")
def armor(sexp, compress=True):
bs = encode(sexp)
if compress: bs = b'gz' + zlib.compress(bs)
bs = base64.b64encode(bs)
return bs.rstrip(b'=')
def unarmor(bs):
bs = bs + b'=' * (-len(bs) % 4) ## re-pad
bs = base64.b64decode(bs)
if bs[0:2] == b'gz': bs = zlib.decompress(bs[2:])
return decode(bs)
class Registry(object):
def __init__(self):
self._labelmap = {}
def record(self, label, baseclass):
import inspect
def add_decoder(fun):
if isinstance(fun, staticmethod):
fun = fun.__func__
arity = len(inspect.getargspec(fun).args)
self._labelmap[(label, baseclass, arity)] = fun
return fun
return add_decoder
def deserialize(self, sexp, baseclass):
if not isinstance(sexp, list) or len(sexp) == 0:
raise ValueError('Cannot deserialize %r' % (sexp,))
key = (sexp[0], baseclass, len(sexp) - 1)
if key not in self._labelmap:
raise ValueError('Cannot deserialize %r' % (sexp,))
fun = self._labelmap[key]
v = fun(*sexp[1:])
if not isinstance(v, baseclass):
raise ValueError('Expected %s; got %r' % (baseclass, v))
return v
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment