Skip to content

Instantly share code, notes, and snippets.

@mwatts15
Created January 30, 2015 19:07
Show Gist options
  • Save mwatts15/171b8936db4b79d02acc to your computer and use it in GitHub Desktop.
Save mwatts15/171b8936db4b79d02acc to your computer and use it in GitHub Desktop.
Demo graph traversals for mapping object graphs to RDF
from __future__ import print_function
import pprint as PP
class A(object):
def __init__(self):
self.o = dict()
self.p = dict()
self.var = None
self.ident = None
@property
def defined(self):
return self.ident is not None
def set_parent(self, title, other):
self.p[title] = other
other.o[title] = self
@property
def idl(self):
if self.defined:
return self.ident
assert(self.var is not None)
return "?"+self.var
class N(object):
def __init__(self):
self.subpaths = []
self.path = []
def __str__(self):
s = "N("
s += "path="+str(self.path)+","
s += "subpaths="+str(self.subpaths)
return s
def __repr__(self):
return self.__str__()
class U(object):
def __init__(self):
self.seen = list()
def b(self, CUR, LIST, IS_INV):
ret = []
is_good = False
for e in LIST:
p = LIST[e]
subpath = self.g(p)
print(" "*len(self.seen)*4, e, subpath)
if IS_INV:
ee = "^"+e
else:
ee = e
if subpath[0]:
is_good = True
subpath[1].path.insert(0, (CUR.idl, ee, p.idl))
ret.insert(0, subpath[1])
return is_good, ret
def g(self, current_node):
print(" "*len(self.seen)*4, "AT {} WITH {}".format(current_node.idl, [x.idl for x in self.seen]))
if current_node.defined:
return True, N()
else:
if current_node in self.seen:
return False, N()
else:
self.seen.append(current_node)
retp = self.b(current_node, current_node.p, False)
reto = self.b(current_node, current_node.o, True)
self.seen.pop()
subpaths = retp[1]+reto[1]
if (len(subpaths) == 1):
ret = subpaths[0]
else:
ret = N()
ret.subpaths = subpaths
return (retp[0] or reto[0], ret)
def __call__(self, current_node):
#print("AT {} WITH {}".format(current_node.idl, [x.idl for x in seen]))
return self.g(current_node)
class V(object):
def __init__(self):
self.seen = list()
self.results = set()
def g(self, current_node):
if current_node.defined:
return True
else:
if current_node in self.seen:
return False
else:
self.seen.append(current_node)
is_good = False
for e in current_node.p:
p = current_node.p[e]
if self.g(p):
is_good = True
self.results.add((current_node.idl, e, p.idl))
for e in current_node.o:
o = current_node.o[e]
if self.g(o):
is_good = True
self.results.add((o.idl, e, current_node.idl))
self.seen.pop()
return is_good
def __call__(self, current_node):
self.g(current_node)
return self.results
def B(q,n):
if len(q.subpaths)>0:
print(" "*n,"Join")
for x in q.subpaths:
B(x,n+4)
print(" "*n,"Query",q.path)
if __name__ == "__main__":
z = A()
z.var = "z"
a = A()
a.var = "a"
b = A()
b.var = "b"
c = A()
d = A()
e = A()
f = A()
c.var = "c"
d.ident = "d"
f.var = "f"
z.set_parent('q', a)
a.set_parent('x', b)
b.set_parent('y', c)
b.set_parent('k', c)
d.set_parent('v', c)
f.set_parent('u', a)
u = U()(z)
PP.pprint(u)
B(u[1], 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment