Skip to content

Instantly share code, notes, and snippets.

@matteobertozzi
Created December 8, 2023 20:16
Show Gist options
  • Save matteobertozzi/4ae48cdcd5fc42a1745f94fcfe81ccba to your computer and use it in GitHub Desktop.
Save matteobertozzi/4ae48cdcd5fc42a1745f94fcfe81ccba to your computer and use it in GitHub Desktop.
Nested Json to Columnar encoding/decoding (Demo)
# Data is transformed in a tree (parent_id, index)
# There are 3 types of objects: STRUCT, ARRAY, VALUES
# the encode() method returns a list of columns containing:
# (parent_id, index, name, nulls[], values[], type)
# for ARRAYs values[] contains the number of items per row
from dataclasses import is_dataclass, asdict as dataclass_as_dict, dataclass
import types
CALLABLES_TYPES = (types.FunctionType, types.MethodType)
def is_object(obj) -> bool:
# TODO: we must find a better way to do this!
return not isinstance(obj, CALLABLES_TYPES) and hasattr(obj, '__dict__')
def object_as_dict(obj: object) -> dict[str, any]:
return {k: v for k, v in obj.__dict__.items() if k[0] != '_' and not isinstance(v, CALLABLES_TYPES)}
@dataclass
class ColumnGroup:
parent_id: int
index: int
name: str
rows: int
nulls: list[int]
values: list[any]
type_names: set[str]
def __init__(self, parent_id: int, index: int, name: str):
self.edit_id = 0
self.parent_id = parent_id
self.index = index
self.name = name
self.rows = 0
self.nulls = []
self.values = []
self.type_names = set()
def is_struct(self):
return 'STRUCT' in self.type_names
def is_array(self):
return 'ARRAY' in self.type_names
def add_nulls(self, count: int):
self.rows += count
for _ in range(count):
self.nulls.append(1)
def add_value(self, type_name: str, value: any):
self._check_type_compatibility(type_name)
self.rows += 1
if value is None:
self.nulls.append(1)
else:
self.nulls.append(0)
self.values.append(value)
def _check_type_compatibility(self, type_name: str):
if self.type_names:
if type_name == 'NULL':
pass
elif 'INT' in self.type_names or 'FLOAT' in self.type_names:
if type_name not in ('INT', 'FLOAT'):
raise Exception('invalid %s type %s: expected %s' % (self.name, type_name, self.type_names))
elif 'NULL' not in self.type_names and type_name not in self.type_names:
raise Exception('invalid %s type %s: expected %s' % (self.name, type_name, self.type_names))
self.type_names.add(type_name)
def array_sub_rows(self):
assert self.is_array()
count = 0
null_index = 0
value_index = 0
for is_null in self.nulls:
if is_null:
count += 1
else:
count += max(1, self.values[value_index])
value_index += 1
null_index += 1
return count
def struct_sub_rows(self):
assert self.is_struct()
count = 0
null_index = 0
value_index = 0
for is_null in self.nulls:
if not is_null:
count += 1
value_index += 1
null_index += 1
return count
class ColumnarDataEncoder:
def __init__(self) -> None:
self.columns: list[ColumnGroup] = []
self._columns_map: dict[tuple[int, str], ColumnGroup] = {}
self._index = 0
def struct_group_begin(self, parent_id: int, name: str) -> ColumnGroup:
group, _ = self._add_column(parent_id, name)
group.type_names.add('STRUCT')
return group
def struct_group_end(self, group: ColumnGroup, num_keys: int) -> None:
group.add_value('STRUCT', num_keys)
self._update_sub_groups(group, group.struct_sub_rows())
def array_group_begin(self, parent_id: int, name: str) -> ColumnGroup:
group, _ = self._add_column(parent_id, name)
group.type_names.add('ARRAY')
return group
def array_group_end(self, group: ColumnGroup, num_items: int) -> None:
group.add_value('ARRAY', num_items)
def _update_sub_groups(self, group: ColumnGroup, sub_rows: int):
for c in self.columns:
if c.parent_id != group.index:
continue
if c.rows < sub_rows:
c.add_nulls(sub_rows - c.rows)
def add(self, parent_id: int, name: str, type_name: str, value: any):
group, _ = self._add_column(parent_id, name)
group.add_value(type_name, value)
return group
def _add_column(self, parent_id: int, name: str) -> tuple[ColumnGroup, bool]:
key = (parent_id, name)
group = self._columns_map.get(key)
if group is not None:
return group, False
self._index += 1
group = ColumnGroup(parent_id, self._index, name)
if parent_id != 0:
parent = self.columns[parent_id - 1]
if parent.is_struct():
group.add_nulls(parent.struct_sub_rows())
self.columns.append(group)
self._columns_map[key] = group
return group, True
class FlatEncoder:
def __init__(self) -> None:
self.data = ColumnarDataEncoder()
self._types_map = {
bool: self.encode_bool,
int: self.encode_int,
float: self.encode_float,
bytes: self.encode_bytes,
bytearray: self.encode_bytes,
str: self.encode_string,
list: self.encode_array,
tuple: self.encode_array,
set: self.encode_array,
dict: self.encode_object,
}
def encode_item(self, item, parent: int = 0, key: str = None):
if item is None:
return self.encode_null(parent, key)
item_type = type(item)
encoder = self._types_map.get(item_type)
if encoder is None:
if is_dataclass(item):
item = dataclass_as_dict(item)
return self.encode_object(item, parent, key)
elif is_object(item):
item = object_as_dict(item)
return self.encode_object(item, parent, key)
else:
raise Exception('unsupported type %s: %s' % (item_type, item))
return encoder(item, parent, key)
def encode_null(self, parent: int, key: str):
self.data.add(parent, key, 'NULL', None)
def encode_bool(self, value: bool, parent: int, key: str):
self.data.add(parent, key, 'BOOL', value)
def encode_int(self, value: int, parent: int, key: str) -> None:
self.data.add(parent, key, 'INT', value)
def encode_float(self, value: float, parent: int, key: str) -> None:
self.data.add(parent, key, 'FLOAT', value)
def encode_string(self, value: str, parent: int, key: str) -> None:
self.data.add(parent, key, 'STRING', value)
def encode_bytes(self, value: bytes, parent: int, key: str) -> None:
self.data.add(parent, key, 'BYTES', value)
def encode_object(self, obj: dict, parent: int, key: str) -> None:
group = self.data.struct_group_begin(parent, key)
keys = obj.keys()
for key in keys:
self.encode_item(obj[key], group.index, key)
self.data.struct_group_end(group, len(keys))
def encode_array(self, array: list, parent: int, key: str) -> None:
group = self.data.array_group_begin(parent, key)
for v in array:
self.encode_item(v, group.index, None)
self.data.array_group_end(group, len(array))
def encode(items) -> list[ColumnGroup]:
f = FlatEncoder()
f.encode_item(items)
return f.data.columns
# ================================================================================
# Decoder
# ================================================================================
@dataclass
class ColumnReader:
parent_id: int
index: int
name: str
nulls: list[int]
values: list[any]
def __init__(self, column: ColumnGroup) -> None:
self.name = column.name
self.index = column.index
self.parent_id = column.parent_id
self.nulls = column.nulls
self.values = column.values
if column.is_array():
self.type = 0
elif column.is_struct():
self.type = 1
else:
self.type = 2
self._null_index = 0
self._value_index = 0
def is_array(self) -> bool:
return self.type == 0
def is_struct(self) -> bool:
return self.type == 1
def is_null(self) -> int:
return self.nulls[self._null_index]
def read_value(self) -> any:
assert not self.is_null()
value = self.values[self._value_index]
self._value_index += 1
self._null_index += 1
return value
def next(self):
if not self.is_null():
self._value_index += 1
self._null_index += 1
def decode_item(parent_index: list[list[ColumnReader]], root: ColumnReader, tab: int):
print(' ' * tab, '->', root, '-> ITEM group:%s null:%s index:%s' % (root.index, root._null_index, root._value_index))
if root.is_null():
root.next()
return None
if root.is_array():
return decode_array(parent_index, root, tab + 2)
if root.is_struct():
return decode_struct(parent_index, root, tab + 2)
return root.read_value()
def decode_struct(parent_index: list[list[ColumnReader]], root: ColumnReader, tab: int):
root.next()
obj = {}
for c in parent_index[root.index]:
v = decode_item(parent_index, c, tab + 2)
if v is not None: obj[c.name] = v
return obj
def decode_array(parent_index: list[list[ColumnReader]], root: ColumnReader, tab: int):
array = []
for _ in range(root.read_value()):
for c in parent_index[root.index]:
array.append(decode_item(parent_index, c, tab + 2))
return array
def decode(columns: list[ColumnGroup]):
columns = sorted(columns, key=lambda s: s.index)
parent_index = [[] for _ in range(1 + len(columns))]
for c in sorted(columns, key=lambda s: (s.parent_id, s.index)):
parent_index[c.parent_id].append(ColumnReader(c))
root = parent_index[0][0]
assert root.parent_id == 0
return decode_item(parent_index, root, 0)
# ================================================================================
# Verify Encode/Decode
# ================================================================================
def verify_item(a, b) -> bool:
if type(a) != type(b):
return False
if isinstance(a, dict) and isinstance(b, dict):
return verify_struct(a, b)
if isinstance(a, list) and isinstance(b, list):
return verify_array(a, b)
return a == b
def dict_to_null(a):
if not isinstance(a, dict):
return a
for v in a.values():
if dict_to_null(v) is not None:
return a
return None
def verify_struct(a: dict[str, any], b: dict[str, any]) -> bool:
for key in set(a.keys()) | set(b.keys()):
v1 = dict_to_null(a.get(key))
v2 = dict_to_null(b.get(key))
if not v1 and not v2:
continue
if not verify_item(v1, v2):
print('----> WRONG ITEM', v1, v2)
return False
return True
def verify_array(a: list[any], b: list[any]) -> bool:
if len(a) != len(b):
return False
for v1, v2 in zip(a, b):
if not verify_item(v1, v2):
return False
return True
def verify(items):
print('-' * 160)
print(items)
print('-' * 160)
enc = encode(items)
dec = decode(enc)
if not verify_item(items, dec):
print(items)
print(dec)
raise Exception()
if __name__ == '__main__':
verify([{"a": []}, {"a": [None]}, {"a": [{"x": 10}]}])
verify({"a": 1, "b": 2, "c": {"d": 1, "e": 2}, "f": [1, 2, 3, 4], "g": {"h": [1, 2, 3], "I": 1},
"l": [{"a": 10, "b": 20}], "m": {"a": [{"b": 10}]}})
verify({"m": {"a": [{"b": 10}]}})
verify([{'a': 10, 'c': 1000}, {'a': 20, 'b': 100}])
verify([{'d': 1}, {'a': 2}, {'c': 3}])
verify([{'a': []}, {}, {'a': [1]}])
verify([{"a": {"x": 1}}, {}])
verify([
{'a': 10, 'b': [1, 2, 3], 'd': 100},
{'b': [4, 5], 'd': 200, 'c': [10, 20, 30, 40]}
])
verify([
{'a': [{'x': 10}], 'b': 1000},
{'a': [{'x': 20, 'y': 200}], 'b': 2000},
])
verify([{
'a': 10,
'b': [100, 200],
'c': {'x': 30},
'd': [{'y': 40}, {'y': 50}, {}],
}])
verify([
{"a": [1]},
{},
{"a": []},
{"a": [None, 2]}
])
verify([
{"a": [1], "b": [{"b1": 1}, {"b1": 1, "b2": [3, 4]}]},
{"b": [{"b1": 2}]},
{"a": [None, None], "b": [None]}
])
verify([
{
"a": 1,
"b": {"b1": 1, "b2": 3},
"d": {"d1": 1}
},
{
"a": 2,
"b": {"b2": 4},
"c": {"c1": 6},
"d": {"d1": 2, "d2": 1}
},
{
"b": {"b1": 5, "b2": 6},
"c": {"c1": 7}
}
])
verify([[], [], None])
verify([None, {}, {'a': 10}, {}])
verify([None, [], [10], [], None, [20, 30], None, [], [40]])
verify([None, [[10, 20], [30]], [], None, []])
verify([None, [[[10], [20]]], [[[30, 40]]], [], None, [[[50], [60]]]])
verify([None, [[1, 2], [3, 4]], [[5, 6, 7, 8]], None, [[9, 10]]])
verify([None, None, 10, 20, None, 30])
verify([None, {}, {'a': {'x': 10}}, {}, {'a': {'x': 20, 'y': 200}}, None, {'a': {'x': 30}, 'b': {'x': 30}}])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment