Created
December 8, 2023 20:16
-
-
Save matteobertozzi/4ae48cdcd5fc42a1745f94fcfe81ccba to your computer and use it in GitHub Desktop.
Nested Json to Columnar encoding/decoding (Demo)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Data is transformed in a tree (parent_id, index) | |
# There are 3 types of objects: STRUCT, ARRAY, VALUES | |
# the encode() method returns a list of columns containing: | |
# (parent_id, index, name, nulls[], values[], type) | |
# for ARRAYs values[] contains the number of items per row | |
from dataclasses import is_dataclass, asdict as dataclass_as_dict, dataclass | |
import types | |
CALLABLES_TYPES = (types.FunctionType, types.MethodType) | |
def is_object(obj) -> bool: | |
# TODO: we must find a better way to do this! | |
return not isinstance(obj, CALLABLES_TYPES) and hasattr(obj, '__dict__') | |
def object_as_dict(obj: object) -> dict[str, any]: | |
return {k: v for k, v in obj.__dict__.items() if k[0] != '_' and not isinstance(v, CALLABLES_TYPES)} | |
@dataclass | |
class ColumnGroup: | |
parent_id: int | |
index: int | |
name: str | |
rows: int | |
nulls: list[int] | |
values: list[any] | |
type_names: set[str] | |
def __init__(self, parent_id: int, index: int, name: str): | |
self.edit_id = 0 | |
self.parent_id = parent_id | |
self.index = index | |
self.name = name | |
self.rows = 0 | |
self.nulls = [] | |
self.values = [] | |
self.type_names = set() | |
def is_struct(self): | |
return 'STRUCT' in self.type_names | |
def is_array(self): | |
return 'ARRAY' in self.type_names | |
def add_nulls(self, count: int): | |
self.rows += count | |
for _ in range(count): | |
self.nulls.append(1) | |
def add_value(self, type_name: str, value: any): | |
self._check_type_compatibility(type_name) | |
self.rows += 1 | |
if value is None: | |
self.nulls.append(1) | |
else: | |
self.nulls.append(0) | |
self.values.append(value) | |
def _check_type_compatibility(self, type_name: str): | |
if self.type_names: | |
if type_name == 'NULL': | |
pass | |
elif 'INT' in self.type_names or 'FLOAT' in self.type_names: | |
if type_name not in ('INT', 'FLOAT'): | |
raise Exception('invalid %s type %s: expected %s' % (self.name, type_name, self.type_names)) | |
elif 'NULL' not in self.type_names and type_name not in self.type_names: | |
raise Exception('invalid %s type %s: expected %s' % (self.name, type_name, self.type_names)) | |
self.type_names.add(type_name) | |
def array_sub_rows(self): | |
assert self.is_array() | |
count = 0 | |
null_index = 0 | |
value_index = 0 | |
for is_null in self.nulls: | |
if is_null: | |
count += 1 | |
else: | |
count += max(1, self.values[value_index]) | |
value_index += 1 | |
null_index += 1 | |
return count | |
def struct_sub_rows(self): | |
assert self.is_struct() | |
count = 0 | |
null_index = 0 | |
value_index = 0 | |
for is_null in self.nulls: | |
if not is_null: | |
count += 1 | |
value_index += 1 | |
null_index += 1 | |
return count | |
class ColumnarDataEncoder: | |
def __init__(self) -> None: | |
self.columns: list[ColumnGroup] = [] | |
self._columns_map: dict[tuple[int, str], ColumnGroup] = {} | |
self._index = 0 | |
def struct_group_begin(self, parent_id: int, name: str) -> ColumnGroup: | |
group, _ = self._add_column(parent_id, name) | |
group.type_names.add('STRUCT') | |
return group | |
def struct_group_end(self, group: ColumnGroup, num_keys: int) -> None: | |
group.add_value('STRUCT', num_keys) | |
self._update_sub_groups(group, group.struct_sub_rows()) | |
def array_group_begin(self, parent_id: int, name: str) -> ColumnGroup: | |
group, _ = self._add_column(parent_id, name) | |
group.type_names.add('ARRAY') | |
return group | |
def array_group_end(self, group: ColumnGroup, num_items: int) -> None: | |
group.add_value('ARRAY', num_items) | |
def _update_sub_groups(self, group: ColumnGroup, sub_rows: int): | |
for c in self.columns: | |
if c.parent_id != group.index: | |
continue | |
if c.rows < sub_rows: | |
c.add_nulls(sub_rows - c.rows) | |
def add(self, parent_id: int, name: str, type_name: str, value: any): | |
group, _ = self._add_column(parent_id, name) | |
group.add_value(type_name, value) | |
return group | |
def _add_column(self, parent_id: int, name: str) -> tuple[ColumnGroup, bool]: | |
key = (parent_id, name) | |
group = self._columns_map.get(key) | |
if group is not None: | |
return group, False | |
self._index += 1 | |
group = ColumnGroup(parent_id, self._index, name) | |
if parent_id != 0: | |
parent = self.columns[parent_id - 1] | |
if parent.is_struct(): | |
group.add_nulls(parent.struct_sub_rows()) | |
self.columns.append(group) | |
self._columns_map[key] = group | |
return group, True | |
class FlatEncoder: | |
def __init__(self) -> None: | |
self.data = ColumnarDataEncoder() | |
self._types_map = { | |
bool: self.encode_bool, | |
int: self.encode_int, | |
float: self.encode_float, | |
bytes: self.encode_bytes, | |
bytearray: self.encode_bytes, | |
str: self.encode_string, | |
list: self.encode_array, | |
tuple: self.encode_array, | |
set: self.encode_array, | |
dict: self.encode_object, | |
} | |
def encode_item(self, item, parent: int = 0, key: str = None): | |
if item is None: | |
return self.encode_null(parent, key) | |
item_type = type(item) | |
encoder = self._types_map.get(item_type) | |
if encoder is None: | |
if is_dataclass(item): | |
item = dataclass_as_dict(item) | |
return self.encode_object(item, parent, key) | |
elif is_object(item): | |
item = object_as_dict(item) | |
return self.encode_object(item, parent, key) | |
else: | |
raise Exception('unsupported type %s: %s' % (item_type, item)) | |
return encoder(item, parent, key) | |
def encode_null(self, parent: int, key: str): | |
self.data.add(parent, key, 'NULL', None) | |
def encode_bool(self, value: bool, parent: int, key: str): | |
self.data.add(parent, key, 'BOOL', value) | |
def encode_int(self, value: int, parent: int, key: str) -> None: | |
self.data.add(parent, key, 'INT', value) | |
def encode_float(self, value: float, parent: int, key: str) -> None: | |
self.data.add(parent, key, 'FLOAT', value) | |
def encode_string(self, value: str, parent: int, key: str) -> None: | |
self.data.add(parent, key, 'STRING', value) | |
def encode_bytes(self, value: bytes, parent: int, key: str) -> None: | |
self.data.add(parent, key, 'BYTES', value) | |
def encode_object(self, obj: dict, parent: int, key: str) -> None: | |
group = self.data.struct_group_begin(parent, key) | |
keys = obj.keys() | |
for key in keys: | |
self.encode_item(obj[key], group.index, key) | |
self.data.struct_group_end(group, len(keys)) | |
def encode_array(self, array: list, parent: int, key: str) -> None: | |
group = self.data.array_group_begin(parent, key) | |
for v in array: | |
self.encode_item(v, group.index, None) | |
self.data.array_group_end(group, len(array)) | |
def encode(items) -> list[ColumnGroup]: | |
f = FlatEncoder() | |
f.encode_item(items) | |
return f.data.columns | |
# ================================================================================ | |
# Decoder | |
# ================================================================================ | |
@dataclass | |
class ColumnReader: | |
parent_id: int | |
index: int | |
name: str | |
nulls: list[int] | |
values: list[any] | |
def __init__(self, column: ColumnGroup) -> None: | |
self.name = column.name | |
self.index = column.index | |
self.parent_id = column.parent_id | |
self.nulls = column.nulls | |
self.values = column.values | |
if column.is_array(): | |
self.type = 0 | |
elif column.is_struct(): | |
self.type = 1 | |
else: | |
self.type = 2 | |
self._null_index = 0 | |
self._value_index = 0 | |
def is_array(self) -> bool: | |
return self.type == 0 | |
def is_struct(self) -> bool: | |
return self.type == 1 | |
def is_null(self) -> int: | |
return self.nulls[self._null_index] | |
def read_value(self) -> any: | |
assert not self.is_null() | |
value = self.values[self._value_index] | |
self._value_index += 1 | |
self._null_index += 1 | |
return value | |
def next(self): | |
if not self.is_null(): | |
self._value_index += 1 | |
self._null_index += 1 | |
def decode_item(parent_index: list[list[ColumnReader]], root: ColumnReader, tab: int): | |
print(' ' * tab, '->', root, '-> ITEM group:%s null:%s index:%s' % (root.index, root._null_index, root._value_index)) | |
if root.is_null(): | |
root.next() | |
return None | |
if root.is_array(): | |
return decode_array(parent_index, root, tab + 2) | |
if root.is_struct(): | |
return decode_struct(parent_index, root, tab + 2) | |
return root.read_value() | |
def decode_struct(parent_index: list[list[ColumnReader]], root: ColumnReader, tab: int): | |
root.next() | |
obj = {} | |
for c in parent_index[root.index]: | |
v = decode_item(parent_index, c, tab + 2) | |
if v is not None: obj[c.name] = v | |
return obj | |
def decode_array(parent_index: list[list[ColumnReader]], root: ColumnReader, tab: int): | |
array = [] | |
for _ in range(root.read_value()): | |
for c in parent_index[root.index]: | |
array.append(decode_item(parent_index, c, tab + 2)) | |
return array | |
def decode(columns: list[ColumnGroup]): | |
columns = sorted(columns, key=lambda s: s.index) | |
parent_index = [[] for _ in range(1 + len(columns))] | |
for c in sorted(columns, key=lambda s: (s.parent_id, s.index)): | |
parent_index[c.parent_id].append(ColumnReader(c)) | |
root = parent_index[0][0] | |
assert root.parent_id == 0 | |
return decode_item(parent_index, root, 0) | |
# ================================================================================ | |
# Verify Encode/Decode | |
# ================================================================================ | |
def verify_item(a, b) -> bool: | |
if type(a) != type(b): | |
return False | |
if isinstance(a, dict) and isinstance(b, dict): | |
return verify_struct(a, b) | |
if isinstance(a, list) and isinstance(b, list): | |
return verify_array(a, b) | |
return a == b | |
def dict_to_null(a): | |
if not isinstance(a, dict): | |
return a | |
for v in a.values(): | |
if dict_to_null(v) is not None: | |
return a | |
return None | |
def verify_struct(a: dict[str, any], b: dict[str, any]) -> bool: | |
for key in set(a.keys()) | set(b.keys()): | |
v1 = dict_to_null(a.get(key)) | |
v2 = dict_to_null(b.get(key)) | |
if not v1 and not v2: | |
continue | |
if not verify_item(v1, v2): | |
print('----> WRONG ITEM', v1, v2) | |
return False | |
return True | |
def verify_array(a: list[any], b: list[any]) -> bool: | |
if len(a) != len(b): | |
return False | |
for v1, v2 in zip(a, b): | |
if not verify_item(v1, v2): | |
return False | |
return True | |
def verify(items): | |
print('-' * 160) | |
print(items) | |
print('-' * 160) | |
enc = encode(items) | |
dec = decode(enc) | |
if not verify_item(items, dec): | |
print(items) | |
print(dec) | |
raise Exception() | |
if __name__ == '__main__': | |
verify([{"a": []}, {"a": [None]}, {"a": [{"x": 10}]}]) | |
verify({"a": 1, "b": 2, "c": {"d": 1, "e": 2}, "f": [1, 2, 3, 4], "g": {"h": [1, 2, 3], "I": 1}, | |
"l": [{"a": 10, "b": 20}], "m": {"a": [{"b": 10}]}}) | |
verify({"m": {"a": [{"b": 10}]}}) | |
verify([{'a': 10, 'c': 1000}, {'a': 20, 'b': 100}]) | |
verify([{'d': 1}, {'a': 2}, {'c': 3}]) | |
verify([{'a': []}, {}, {'a': [1]}]) | |
verify([{"a": {"x": 1}}, {}]) | |
verify([ | |
{'a': 10, 'b': [1, 2, 3], 'd': 100}, | |
{'b': [4, 5], 'd': 200, 'c': [10, 20, 30, 40]} | |
]) | |
verify([ | |
{'a': [{'x': 10}], 'b': 1000}, | |
{'a': [{'x': 20, 'y': 200}], 'b': 2000}, | |
]) | |
verify([{ | |
'a': 10, | |
'b': [100, 200], | |
'c': {'x': 30}, | |
'd': [{'y': 40}, {'y': 50}, {}], | |
}]) | |
verify([ | |
{"a": [1]}, | |
{}, | |
{"a": []}, | |
{"a": [None, 2]} | |
]) | |
verify([ | |
{"a": [1], "b": [{"b1": 1}, {"b1": 1, "b2": [3, 4]}]}, | |
{"b": [{"b1": 2}]}, | |
{"a": [None, None], "b": [None]} | |
]) | |
verify([ | |
{ | |
"a": 1, | |
"b": {"b1": 1, "b2": 3}, | |
"d": {"d1": 1} | |
}, | |
{ | |
"a": 2, | |
"b": {"b2": 4}, | |
"c": {"c1": 6}, | |
"d": {"d1": 2, "d2": 1} | |
}, | |
{ | |
"b": {"b1": 5, "b2": 6}, | |
"c": {"c1": 7} | |
} | |
]) | |
verify([[], [], None]) | |
verify([None, {}, {'a': 10}, {}]) | |
verify([None, [], [10], [], None, [20, 30], None, [], [40]]) | |
verify([None, [[10, 20], [30]], [], None, []]) | |
verify([None, [[[10], [20]]], [[[30, 40]]], [], None, [[[50], [60]]]]) | |
verify([None, [[1, 2], [3, 4]], [[5, 6, 7, 8]], None, [[9, 10]]]) | |
verify([None, None, 10, 20, None, 30]) | |
verify([None, {}, {'a': {'x': 10}}, {}, {'a': {'x': 20, 'y': 200}}, None, {'a': {'x': 30}, 'b': {'x': 30}}]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment