Skip to content

Instantly share code, notes, and snippets.

@davidscherer
Created November 5, 2013 14:41
Show Gist options
  • Save davidscherer/7320007 to your computer and use it in GitHub Desktop.
Save davidscherer/7320007 to your computer and use it in GitHub Desktop.
newtypes prototype @601942430f40
import struct, random
from bisect import bisect_left
class Types (object):
def __init__(self):
self.encoders = {}
self.decoders = {}
def encode_key(self, val):
enc = self.encoders.get(type(val))
if enc: return enc.encode_ordered(val)
if hasattr(val, 'encode_ordered'):
return val.encode_ordered()
raise TypeError("Unsupported data type: " + str(type(val)))
def decode_key(self, encoded_key):
dec = self.decoders.get( encoded_key[0] )
if dec: return dec.decode_ordered(encoded_key,0)
raise ValueError("Unknown type code: " + repr(encoded_key))
def encode_value(self, val):
enc = self.encoders.get(type(val))
if enc: return enc.encode_unordered(val)
if hasattr(val, 'encode_unordered'):
return val.encode_unordered()
raise TypeError("Unsupported data type: " + str(type(val)))
def decode_value(self, encoded_key):
dec = self.decoders.get( encoded_key[0] )
if dec: return dec.decode_unordered(encoded_key,0)
raise ValueError("Unknown type code: " + repr(encoded_key))
def encode_key_part(self, val):
enc = self.encoders.get(type(val))
if enc: return enc.encode_ordered_selfterm(val)
if hasattr(val, 'encode_ordered_selfterm'):
return val.encode_ordered_selfterm()
raise TypeError("Unsupported data type: " + str(type(val)))
def decode_key_part(self, encoded_key, pos):
dec = self.decoders.get( encoded_key[pos] )
if dec: return dec.decode_ordered_selfterm(encoded_key,pos)
raise ValueError("Unknown type code: " + repr(encoded_key[pos:]))
def encode_value_part(self, val):
enc = self.encoders.get(type(val))
if enc: return enc.encode_unordered_selfterm(val)
if hasattr(val, 'encode_unordered_selfterm'):
return val.encode_unordered_selfterm()
raise TypeError("Unsupported data type: " + str(type(val)))
def decode_value_part(self, encoded_key, pos):
dec = self.decoders.get( encoded_key[pos] )
if dec: return dec.decode_unordered_selfterm(encoded_key,pos)
raise ValueError("Unknown type code: " + repr(encoded_key[pos:]))
types = Types()
class TypeCodec (object):
# A codec provides four different encodings of a given type, and four corresponding decoders
# The four encodings are for the different combinations of [ordered,unordered] x [self-terminating, non-self-terminating]
# A minimal implementation only needs to override type, typecode, encode_ordered_selfterm and decode_ordered_selfterm,
# and the other three encodings will automatically be done identically. To save space, more compact
# encodings of some of the other encodings may be implemented.
typecode = None
type = None
def __init__(self, types=types):
self.typesystem = types
types.encoders[self.type] = self
types.decoders[self.typecode] = self
def encode_ordered_selfterm(self, v):
"""Returns a string s representing v such that:
s.startswith(self.typecode) and
self.decode_ordered_selfterm(s)==(v,len(s)) and
encode_ordered_selfterm(a)<encode_ordered_selfterm(b) == a<b"""
raise NotImplemented()
def decode_ordered_selfterm(self, v, pos):
raise NotImplemented()
def encode_ordered(self, v):
"""Returns a string s representing v such that:
s.startswith(self.typecode) and
self.decode_ordered(s,0) == v and
encode_ordered(a)<encode_ordered(b) == a<b"""
# This can be overridden for efficiency; the default is to use encode_ordered_selfterm
return self.encode_ordered_selfterm(v)
def decode_ordered(self, v, pos):
return self.decode_ordered_selfterm(v, pos)[0]
def encode_unordered_selfterm(self, v):
"""Returns a string s representing v such that:
s.startswith(self.typecode) and
self.decode_unordered_selfterm(s,0)==(v,len(s))"""
# This can be overridden for efficiency; the default is to use encode_ordered_selfterm
return self.encode_ordered_selfterm(v)
def decode_unordered_selfterm(self, v, pos):
return self.decode_ordered_selfterm(v, pos)
def encode_unordered(self, v):
"""Returns a string s representing v such that:
s.startswith(self.typecode()) and
self.decode_ordered(s,0) == v"""
# This can be overridden for efficiency; the default is to use encode_ordered
return self.encode_ordered(v)
def decode_unordered(self, v, pos):
return self.decode_ordered(v, pos)
class NoneCodec (TypeCodec):
type = type(None)
typecode = "\x00"
def encode_ordered_selfterm(self, v):
return "\x00"
def decode_ordered_selfterm(self, v, pos):
return None, pos+1
NoneCodec()
# TODO: Tweak?
def _encodeUint7bit(l):
s = ""
while l >= 128:
s += chr( (l & 0x7f) | 0x80 )
l >>= 7
return s + chr( l )
def _decodeUint7bit(v,pos):
l = 0
p = 0
while ord(v[pos]) >= 128:
l += (ord(v[pos])&0x7f) << p
p += 7
pos += 1
return l + (ord(v[pos]) << p), pos
class ByteStrCodec (TypeCodec):
type = bytes
typecode = "\x01"
def encode_ordered_selfterm(self, v):
return self.typecode + v.replace("\x00", "\x00\xFF") + "\x00"
def decode_ordered_selfterm(self, v, pos):
end = ByteStrCodec._find_terminator( v, pos+1 )
return v[pos+1:end].replace("\x00\xff", "\x00"), end+1
def encode_ordered(self, v):
return self.typecode + v
def decode_ordered(self, v, pos):
return v[pos+1:]
def encode_unordered_selfterm(self, v):
return self.typecode + _encodeUint7bit(len(v)) + v
def decode_unordered_selfterm(self, v, pos):
l, pos = _decodeUint7bit(v, pos+1)
return v[pos+1:pos+1+l], pos+1+l
@staticmethod
def _find_terminator( v, pos ):
# Finds the start of the next terminator [\x00]![\xff] or the end of v
while True:
pos = v.find('\x00', pos)
if pos < 0:
return len(v)
if pos+1 == len(v) or v[pos+1] != '\xff':
return pos
pos += 2
ByteStrCodec()
class UnicodeCodec( ByteStrCodec ):
type = unicode
typecode = "\x02"
def encode_ordered_selfterm(self, v):
return ByteStrCodec.encode_ordered_selfterm(self, v.encode("utf-8"))
def decode_ordered_selfterm(self, v, pos):
s,end = ByteStrCodec.decode_ordered_selfterm(self, v, pos)
return s.decode("utf-8"), end
def encode_ordered(self, v):
return ByteStrCodec.encode_ordered(self, v.encode("utf-8"))
def decode_ordered(self, v, pos):
return ByteStrCodec.decode_ordered(self, v, pos).decode("utf-8")
# TODO: Use a different unordered selfterm encoding because UTF-8 will not normally have
# lots of nulls?
def encode_unordered_selfterm(self, v):
return ByteStrCodec.encode_unordered_selfterm(self, v.encode("utf-8"))
def decode_unordered_selfterm(self, v, pos):
s,end = ByteStrCodec.decode_unordered_selfterm(self, v, pos)
return s.decode("utf-8"), end
UnicodeCodec()
class IntegerCodec( TypeCodec ):
types = [int, long]
typecodes = range(12, 29)
_size_limits = tuple( (1 << (i*8))-1 for i in range(9) )
def __init__(self, types):
for type in self.types:
types.encoders[type] = self
for code in self.typecodes:
types.decoders[chr(code)] = self
def encode_ordered_selfterm(self, v):
if v == 0:
return chr(20)
elif v > 0:
n = bisect_left( self._size_limits, v )
return chr(20+n) + struct.pack(">Q",v)[-n:]
else:
n = bisect_left( self._size_limits, -v )
maxv = self._size_limits[n]
return chr(20-n) + struct.pack(">Q", maxv+v)[-n:]
def decode_ordered_selfterm(self, v, pos):
n = ord(v[pos]) - 20
if n >= 0:
end = pos+1+n
return struct.unpack(">Q", chr(0)*(8-n) + v[pos+1:end])[0], end
else:
end = pos+1-n
return struct.unpack(">Q", chr(0)*(8+n) + v[pos+1:end])[0]-self._size_limits[-n], end
IntegerCodec(types)
class TupleCodec( TypeCodec ):
type = tuple
typecode = "\x03"
endcode = "\x04" # Not *exactly* a type code, but has to be different from any valid type code and from 0xff
def encode_ordered_selfterm(self, v):
return self.typecode + "".join( self.typesystem.encode_key_part(p) for p in v ) + self.endcode
def decode_ordered_selfterm(self, v, pos):
result = []
pos += 1
while v[pos]!=self.endcode:
part, nextpos = self.typesystem.decode_key_part(v, pos)
result.append(part)
pos = nextpos
return tuple(result), pos+1
def encode_ordered(self, v):
return self.typecode + "".join( self.typesystem.encode_key_part(p) for p in v )
def decode_ordered(self, v, pos):
end = len(v)
result = []
pos += 1
while pos != end:
part, nextpos = self.typesystem.decode_key_part(v, pos)
result.append(part)
pos = nextpos
return tuple(result)
def encode_unordered_selfterm(self, v):
return self.typecode + "".join( self.typesystem.encode_value_part(p) for p in v ) + self.endcode
def decode_unordered_selfterm(self, v, pos):
result = []
pos += 1
while v[pos]!=self.endcode:
part, nextpos = self.typesystem.decode_value_part(v, pos)
result.append(part)
pos = nextpos
return tuple(result), pos+1
def encode_unordered(self, v):
return self.typecode + "".join( self.typesystem.encode_value_part(p) for p in v )
def decode_unordered(self, v, pos):
end = len(v)
result = []
pos += 1
while pos != end:
part, nextpos = self.typesystem.decode_value_part(v, pos)
result.append(part)
pos = nextpos
return tuple(result)
TupleCodec()
class ExtendedTypeCodec (TypeCodec):
# This is really only half a codec- it handles dispatching of multibyte type codes
# starting with 0x4f
typecode = "\x4f"
def _get_dec(self, v, pos):
e = pos+1
while ord(v[e])>=128:
e+=1
dec = self.typesystem.decoders.get(v[pos:e+1])
if dec: return dec
raise ValueError("Unknown type code: " + repr(v))
def decode_ordered_selfterm(self, v, pos):
return self._get_dec(v,pos).decode_ordered_selfterm(v,pos)
def decode_unordered_selfterm(self, v, pos):
return self._get_dec(v,pos).decode_unordered_selfterm(v,pos)
def decode_unordered(self, v, pos):
return self._get_dec(v,pos).decode_unordered(v,pos)
def decode_ordered(self, v, pos):
return self._get_dec(v,pos).decode_ordered(v,pos)
ExtendedTypeCodec()
##class Subspace (object):
## def __init__(self, tuplePrefix, types=types):
## self.tuplePrefix = tuplePrefix
## self.types = types
## def __getitem__(self, name):
## return Subspace( self.tuplePrefix + (name,), self.types )
## def key(self):
## return self.types.encode_key( self.tuplePrefix )
## def encode_key(self, key):
## return self.types.encode_key( self.tuplePrefix + key )
## def decode_key(self, key):
## k = self.types.decode_key( key )
## assert k[:len(self.tuplePrefix)] == self.tuplePrefix
## return k[ len(self.tuplePrefix): ]
## def encode_value(self, value):
## return self.types.encode_value(value)
## def decode_value(self, value):
## return self.types.decode_value(value)
##
##class Subspace2 (object):
## def __init__(self, rawPrefix, types=types):
## self.rawPrefix = rawPrefix
## self.types = types
## def encode_key( self, key ):
## return self.rawPrefix + self.types.encode_key(key)
## def decode_key( self, key ):
## assert key.startswith(self.rawPrefix)
## return self.types.decode_key(key[len(self.rawPrefix):])
## def encode_value(self, value):
## return self.types.encode_value(value)
## def decode_value(self, value):
## return self.types.decode_value(value)
class Subspace (object):
def __init__(self, tuplePrefix, types=types, rawPrefix=""):
self.rawPrefix = rawPrefix
if tuplePrefix != None:
self.rawPrefix += types.encode_key(tuplePrefix)
self.types = types
def __getitem__(self, name):
return Subspace( None, self.types, self.encode_key( name ) )
def key(self):
return self.rawPrefix
def encode_key( self, key ):
return self.rawPrefix + self.types.encode_key_part(key)
def decode_key( self, key ):
assert key.startswith(self.rawPrefix)
return self.types.decode_key_part( key, len(self.rawPrefix) )[0]
def encode_value(self, value):
return self.types.encode_value(value)
def decode_value(self, value):
return self.types.decode_value(value)
def test_enc(v):
print repr(v), repr(types.encode_key(v)), repr(types.encode_value(v))
assert (types.decode_key(types.encode_key(v))==v and types.decode_value(types.encode_value(v))==v)
def test_random_int():
return random.randrange( -(1<<64), 1<<64 )
def test_random_edge_int():
return random.choice((-1,+1)) * (1<<random.randrange(0,64)) + random.randrange(-5,5)
def test_random_bytes():
if random.random() < 0.01:
return "\x00\xff" * 200
chars = ['\x00', '\x01', 'a', '7', '\xfe', '\ff']
return ''.join([random.choice(chars) for c in range(random.randint(0, 5))])
def test_random_unicode():
chars = [u'\x00', u'\x01', u'a', u'7', u'\xfe', u'\ff', u'\u0000', u'\u0001', u'\uffff', u'\uff00']
return u''.join([random.choice(chars) for c in range(random.randint(0, 5))])
def test_random_tuple():
return tuple( test_random_value() for x in range(random.randint(0,4)) )
def test_random_fruit():
return Fruit( random.choice(["apple","banana"]), random.choice(["red","green","yellow"]), random.randint(0,300) )
def test_random_value():
return random.choice( [lambda: None, test_random_edge_int, test_random_int, test_random_bytes, test_random_unicode, test_random_tuple, test_random_fruit] )()
def test():
for i in range(1000000):
test_enc(test_random_value())
def user_typecode( v ):
# User type codes 0-14 come from reserved space 0x40-0x4e
if v<15: return chr(0x40+v)
# User type codes 15+ are 7-bit encoded
return "\x4f" + _encodeUint7bit(v-15)
class ExtendedType (object):
def encode_ordered(self):
return self.encode_ordered_selfterm()
def encode_unordered_selfterm(self):
return self.encode_ordered_selfterm()
def encode_unordered(self):
return self.encode_ordered()
@classmethod
def decode_ordered(cls, v, pos):
return cls.decode_ordered_selfterm(v, pos)[0]
@classmethod
def decode_unordered_selfterm(cls, v, pos):
return cls.decode_ordered_selfterm(v, pos)
@classmethod
def decode_unordered(cls, v, pos):
return cls.decode_ordered(v, pos)
class SimpleExtendedType (ExtendedType):
def encode_ordered_selfterm(self):
return self.typecode + types.encode_key_part( self.to_constructor_parameters() )
@classmethod
def decode_ordered_selfterm(cls,s,pos):
pos += len(cls.typecode)
v,pos = types.decode_key_part(s,pos)
return cls( *v ), pos
class Fruit (ExtendedType):
def __init__(self, fruitType, color, weight):
self.fruitType = fruitType
self.color = color
self.weight = weight
def __repr__(self):
return "Fruit: %dg %s %s" % (self.weight, self.color, self.fruitType)
def __eq__(self, other):
return self.fruitType==other.fruitType and self.color==other.color and self.weight == other.weight
typecode = user_typecode(100)
def encode_ordered_selfterm(self):
return self.typecode + types.encode_key_part(self.fruitType) + types.encode_key_part(self.color) + types.encode_key_part(self.weight)
@classmethod
def decode_ordered_selfterm(cls,s,pos):
pos += len(cls.typecode)
a = []
for i in range(3):
v,pos = types.decode_key_part(s,pos)
a.append(v)
return Fruit( *a ), pos
class UnitValue (SimpleExtendedType):
def __init__(self, value, units):
self.value = value
self.units = units
def __repr__(self):
return str(self.value) + str(self.units)
typecode = user_typecode(100000)
def to_constructor_parameters(self):
return (self.value, self.units)
types.decoders[UnitValue.typecode] = UnitValue
types.decoders[Fruit.typecode] = Fruit
#types.encoders[Fruit.typecode] = SelfEncoder
# Other possible types:
# float, double, extended-double
# decimal (arbitrary-precision fixed point)
# bigger ints
# timestamp
# uuid
def set(subspace, key, value):
print "SET", types.decode_key(subspace.encode_key(key)), ":=", types.decode_value(subspace.encode_value(value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment