Skip to content

Instantly share code, notes, and snippets.

@davidlatwe
Created August 6, 2018 05:04
Show Gist options
  • Save davidlatwe/5c679bd1e29546bc4dcb2768af7c12b7 to your computer and use it in GitHub Desktop.
Save davidlatwe/5c679bd1e29546bc4dcb2768af7c12b7 to your computer and use it in GitHub Desktop.
montydb.fieldwalker, NodeTree implementation
# from collections import deque
class _NoVal(object):
def __repr__(self):
return "_NoVal()"
__solts__ = ()
_no_val = _NoVal()
# _no_val = object()
class FieldValues(object):
__slots__ = ("nodes", "values", "exists", "null_or_missing",
"matched_node", "_value_iter", "__iter")
def __init__(self, nodes):
self.nodes = nodes
self.values = list(self._iter(False, False, True))
self.exists = any(nd.exists for nd in nodes)
self.null_or_missing = (any(nd.is_missing() for nd in nodes) or
self.exists and None in self.values)
self.matched_node = None
self._value_iter = self.iter_full
def _iter(self, array_only, unpack, pack):
for node in self.nodes:
self.matched_node = node
doc = node.value
if isinstance(doc, list):
# docs in array
if unpack and not node.picked:
for elem in doc:
if elem is not _no_val:
yield elem
if pack:
yield doc
else:
# doc or array positioned doc
if not array_only and doc is not _no_val:
yield doc
self.matched_node = None
def iter_full(self):
return self._iter(False, True, True)
def iter_arrays(self):
return self._iter(True, False, True)
def __next__(self):
return next(self.__iter)
next = __next__
def __iter__(self):
self.__iter = self._value_iter()
return self
def __enter__(self):
return self
def __exit__(self, *args):
self._value_iter = self.iter_full
def __repr__(self):
return "FieldValues({})".format(self.values)
def __eq__(self, other):
return self.values == other
class FieldNode(str):
__slots__ = ("value", "picked", "exists",
"in_array", "parent", "children")
def __new__(cls, field, doc, picked=False, exists=False,
in_array=False, parent=None):
obj = str.__new__(cls, field)
obj.value = doc
obj.picked = picked
obj.exists = exists
obj.in_array = in_array
obj.parent = parent
obj.children = []
if getattr(field, "in_array", False):
obj.in_array = True
return obj
def __repr__(self):
return "FieldNode({})".format(self)
def is_missing(self):
if self.in_array and not self.picked:
return not self.exists # doc in array, missing if not exists
if not self.in_array and not self.exists:
return True
return False
def full_path(self):
forepath = getattr(self.parent, "full_path", lambda: "")()
if forepath:
return forepath + "." + self
return self
def grow(self, value, field, picked=False, exists=True, in_array=False):
self.children.append(
FieldNode(field, value, picked, exists, in_array, self))
class FieldTree(object):
def __init__(self, doc, doc_type=None):
self.map_cls = doc_type or type(doc)
self.root = FieldNode("", doc, exists=True)
def __str__(self):
def print_tree(parent, level=0):
status = "*" * ((not parent.exists) + parent.is_missing())
tree_str = "\t" * level + parent + status + "\n"
for node in parent.children:
tree_str += print_tree(node, level + 1)
return tree_str
return "FieldTree({})".format(print_tree(self.root))
def __repr__(self):
return "FieldTree({})".format(self)
class FieldTreeTraverser(object):
def __init__(self):
self.map_cls = None
self.leaves = []
def build(self, tree, fields):
self.map_cls = tree.map_cls
self.leaves = [tree.root]
pre_field = ""
for field in fields:
old_leaves = []
for node in [c for n in self.leaves for c in n.children]:
if field == node or node.endswith("." + field):
old_leaves.append(node)
if old_leaves:
self.leaves = old_leaves
pre_field = field
continue
new_leaves = []
for node in self.leaves:
if not (pre_field == node or node.endswith("." + pre_field)):
continue
if node.exists is False:
new_leaves.append(node)
else:
new_leaves += self.read(node, field)
self.leaves = new_leaves
pre_field = field
# stop if all nodes not exists
if all(node.exists is False for node in self.leaves):
break
def read(self, node, field):
if isinstance(node.value, self.map_cls):
self.read_map(node, field)
elif isinstance(node.value, list):
self.read_array(node, field)
else:
node.grow(_no_val, field, exists=False,
picked=node.picked, in_array=node.in_array)
return node.children
def read_map(self, node, field, index=None, elem=None):
doc = node.value if elem is None else elem
try:
val = doc[field]
exists = True
except KeyError:
val = _no_val
exists = False
if index:
field = index + "." + field
node.grow(val, field, exists=exists, in_array=bool(index))
def read_array(self, node, field):
doc = node.value
for i, elem in enumerate(doc):
if isinstance(elem, self.map_cls):
self.read_map(node, field, str(i), elem)
if field.isdigit():
try:
val = doc[int(field)]
exists = True
except IndexError:
val = _no_val
exists = False
node.grow(val, field, picked=True, exists=exists, in_array=True)
doc = {"a": [{"b": 4}, {"b": 8}, 7, {"b": [5, 9]}], "c": {"d": 5}}
tree = FieldTree(doc)
traverser = FieldTreeTraverser()
path = "a.b"
fields = path.split(".")
traverser.build(tree, fields)
val = FieldValues(traverser.leaves)
path = "c.d.e"
fields = path.split(".")
traverser.build(tree, fields)
path = "a.b.1"
fields = path.split(".")
traverser.build(tree, fields)
val = FieldValues(traverser.leaves)
path = "c.d.f"
fields = path.split(".")
traverser.build(tree, fields)
val = FieldValues(traverser.leaves)
path = "c.x.f"
fields = path.split(".")
traverser.build(tree, fields)
val = FieldValues(traverser.leaves)
print(tree)
print("4 in val:", 4 in val)
print("MATCH", val.matched_node)
if val.matched_node:
print(val.matched_node)
print(val.matched_node.full_path())
print("full", [v for v in val])
print("arrays", [v for v in val.iter_arrays()])
print("values", val)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment