Skip to content

Instantly share code, notes, and snippets.

@etrepum
Created November 5, 2011 00:14
Show Gist options
  • Save etrepum/1340848 to your computer and use it in GitHub Desktop.
Save etrepum/1340848 to your computer and use it in GitHub Desktop.
# Implementation of BinarySortableSerDe for Python
# Implementation of BinarySortableSerDe for Python
from struct import pack, unpack, unpack_from
from itertools import izip
class Schema(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return self.name
def __neg__(self):
return Neg(self)
def __pos__(self):
return self
class Neg(object):
def __init__(self, schema):
self.schema = schema
def __repr__(self):
return '-{!r}'.format(self.schema)
def __neg__(self):
return self.schema
def __pos__(self):
return self
Bool = Schema('Bool')
Byte = Schema('Byte')
Short = Schema('Short')
Int = Schema('Int')
Long = Schema('Long')
Double = Schema('Double')
Str = Schema('Str')
_NEG_TABLE = ''.join(map(chr, range(255, -1, -1)))
# No support for complex types, they are probably uninteresting for row keys
def encode(fmt, data):
"""Encode the data tuple with BinarySortableSerDe format. Example:
>>> encode((-Bool, Str, Long, Double), (True, 'hey!', -123, 1.2))
'\x01\x01\x01hey!\x00\x01\x7f\xff\xff\xff\xff\xff\xff\x85\x01\xbf\xf3333333'
"""
packfmt = ['>']
values = []
for f, v in izip(fmt, data):
if v is None:
packfmt.append('B')
values.append(0)
continue
packfmt.append('B')
values.append(1)
if isinstance(f, Neg):
f = -f
if f is Bool:
v = not v
elif f is Str:
if isinstance(v, unicode):
v = v.encode('utf-8')
v = v.translate(_NEG_TABLE)
else:
v = -v
if f is Bool:
packfmt.append('B')
values.append(2 if v else 1)
elif f is Byte:
packfmt.append('B')
values.append((v & 0xff) ^ 0x80)
elif f is Short:
packfmt.append('H')
values.append((v & 0xffff) ^ 0x8000)
elif f is Int:
packfmt.append('I')
values.append((v & 0xffffffff) ^ 0x80000000)
elif f is Long:
packfmt.append('Q')
values.append((v & 0xffffffffffffffff) ^ 0x8000000000000000)
elif f is Double:
packfmt.append('Q')
vint = unpack('>Q', pack('>d', v))[0]
values.append((vint ^ 0xffffffffffffffff)
if v < 0
else (vint ^ 0x8000000000000000))
elif f is Str:
if isinstance(v, unicode):
v = v.encode('utf-8')
vstr = (v.replace('\x01', '\x01\x02').replace('\x00', '\x01\x01')
+ '\x00')
packfmt.append('{}s'.format(len(vstr)))
values.append(vstr)
else:
raise TypeError("unsupported format {!r}".format(f))
return pack(''.join(packfmt), *values)
def decode_from(fmt, data, i=0):
res = []
for f in fmt:
chunk = data[i]
i += 1
if chunk == '\x00':
res.append(None)
continue
elif chunk != '\x01':
raise ValueError(
"Invalid marker {!r} at index {}".format(chunk, i - 1))
neg = isinstance(f, Neg)
if neg:
f = -f
if f is Bool:
chunk = data[i]
i += 1
if chunk == '\x02':
v = True
elif chunk == '\x01':
v = False
else:
raise ValueError(
"Invalid Bool {!r} at index {}".format(chunk, i - 1))
res.append(not v if neg else v)
elif f is Byte:
v = ord(data[i]) - 0x80
i += 1
res.append(-v if neg else v)
elif f is Short:
v = unpack_from('>H', data, i)[0] - 0x8000
i += 2
res.append(-v if neg else v)
elif f is Int:
v = unpack_from('>I', data, i)[0] - 0x80000000
i += 4
res.append(-v if neg else v)
elif f is Long:
v = unpack_from('>Q', data, i)[0] - 0x8000000000000000
i += 8
res.append(-v if neg else v)
elif f is Double:
v = unpack_from('>Q', data, i)[0]
i += 8
if v & 0x8000000000000000:
v ^= 0x8000000000000000
else:
v ^= 0xffffffffffffffff
v = unpack('>d', pack('>Q', v))[0]
res.append(-v if neg else v)
elif f is Str:
next_i = data.index('\x00', i)
v = data[i:next_i]
i = next_i + 1
v = v.replace('\x01\x01', '\x00').replace('\x01\x02', '\x01')
res.append(v.translate(_NEG_TABLE) if neg else v)
else:
raise TypeError("unsupported format {!r}".format(f))
return res, i
def decode(fmt, data):
return decode_from(fmt, data)[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment