Last active
June 21, 2020 06:20
-
-
Save justinfay/65f17999f9d015d81d5a3e39d6a13dcd to your computer and use it in GitHub Desktop.
A python fork of http://aosabook.org/en/500L/dagoba-an-in-memory-graph-database.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial, wraps | |
pipetypes = {} | |
def pipetype(func): | |
pipetypes[func.__name__] = func | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
return func(*args, **kwargs) | |
return wrapper | |
@pipetype | |
def vector(graph, *ids): | |
if not ids: | |
return graph.vertices[:] | |
for id_ in ids: | |
try: | |
yield graph.vertex_index[id_] | |
except KeyError: | |
pass | |
@pipetype | |
def find(pipeline, *args): | |
""" | |
A special pipetype that only ever receives | |
a send once with the graph. | |
""" | |
for item in pipeline: | |
if not args: | |
yield item | |
for key in args: | |
if key in item: | |
yield item | |
break | |
@pipetype | |
def nth(pipeline, n): | |
i = 0 | |
for _ in pipeline: | |
if i == n: | |
yield _ | |
return | |
i += 1 | |
@pipetype | |
def unique(pipeline): | |
seen = set() | |
for _ in pipeline: | |
if _['_id'] not in seen: | |
seen.add(_['_id']) | |
yield _ | |
@pipetype | |
def filter(pipeline, func): | |
for _ in pipeline: | |
if func(_): | |
yield _ | |
@pipetype | |
def take(pipeline, n): | |
i = 0 | |
for _ in pipeline: | |
yield _ | |
i += 1 | |
if i == n: | |
return | |
@pipetype | |
def in_(pipeline, **kwargs): | |
for item in pipeline: | |
for edge in item['_in']: | |
if kwargs: | |
for k, v in kwargs.items(): | |
if edge[k] == v: | |
yield edge['_in'] | |
break | |
else: | |
yield edge['_in'] | |
@pipetype | |
def out(pipeline, **kwargs): | |
for item in pipeline: | |
for edge in item['_out']: | |
if kwargs: | |
for k, v in kwargs.items(): | |
if edge[k] == v: | |
yield edge['_out'] | |
break | |
else: | |
yield edge['_out'] | |
@pipetype | |
def attr(pipeline, attr): | |
for _ in pipeline: | |
yield _[attr] | |
@pipetype | |
def drop(pipeline, **kwargs): | |
for _ in pipeline: | |
for k, v in kwargs.items(): | |
if _.get(k) == v: | |
break | |
else: | |
yield _ | |
class Graph: | |
def __init__(self, vertices=None, edges=None): | |
self.vertices = [] | |
self.edges = [] | |
self.vertex_index = {} | |
if vertices is not None: | |
self.add_vertices(vertices) | |
if edges is not None: | |
self.add_edges(edges) | |
def add_vertices(self, vertices): | |
for vertice in vertices: | |
self.add_vertex(vertice) | |
def add_edges(self, edges): | |
for edge in edges: | |
self.add_edge(edge) | |
def add_vertex(self, vertex): | |
if '_id' not in vertex: | |
vertex['_id'] = id(vertex) | |
elif vertex['_id'] in self.vertex_index: | |
raise ValueError( | |
'vertex with ID {} in graph already'.format(vertex['_id'])) | |
vertex.setdefault('_out', []) | |
vertex.setdefault('_in', []) | |
self.vertex_index[vertex['_id']] = vertex | |
self.vertices.append(vertex) | |
return vertex['_id'] | |
def add_edge(self, edge): | |
in_ = self.vertex_index[edge['_in']] | |
out_ = self.vertex_index[edge['_out']] | |
in_['_out'].append(edge) | |
out_['_in'].append(edge) | |
edge['_in'] = in_ | |
edge['_out'] = out_ | |
self.edges.append(edge) | |
class Query: | |
""" | |
Base class for lazy querying. | |
""" | |
def __init__(self, graph=None): | |
self.graph = graph | |
# A serious of pipetypes that make up the pipeline. | |
self.pipeline = [] | |
self.steps = [] | |
def __getattr__(self, attr): | |
""" | |
Used to dynamically add steps to our query. | |
Note the filters aren't called here as we need | |
to maintain laziness. Therefore the steps are | |
recorded and later when we try to pull a value | |
through our pipeline that will start generating. | |
""" | |
try: | |
pipetype = pipetypes[attr] | |
except KeyError: | |
raise ValueError('Not a valid pipetype "{}"'.format(attr)) | |
return partial(self._add_pipetype, pipetype) | |
def _add_pipetype(self, pipetype, *args, **kwargs): | |
self.steps.append((pipetype, args, kwargs)) | |
# Return self to allow chaining style. | |
return self | |
def __iter__(self): | |
self.pipeline = [] | |
last = self.graph | |
for pipetype, args, kwargs in self.steps: | |
last = pipetype(last, *args, **kwargs) | |
self.pipeline.append(last) | |
return iter(self.pipeline[-1]) | |
def all(self): | |
return [_ for _ in self] | |
def add_pipeline(self, pipeline): | |
""" | |
pipeline is just another query. | |
""" | |
self.steps.extend(pipeline.steps) | |
return self | |
if __name__ == "__main__": | |
justin = { | |
'_id': 'justin', | |
} | |
marta = { | |
'_id' : 'marta', | |
} | |
julia = { | |
'_id' : 'julia', | |
} | |
paul = { | |
'_id': 'paul', | |
} | |
jacinta = { | |
'_id': 'jacinta' | |
} | |
don = { | |
'_id': 'don' | |
} | |
rosario = { | |
'_id': 'rosario' | |
} | |
graph = Graph(vertices=[justin, marta, julia, paul, jacinta, don, rosario]) | |
graph.add_edges([ | |
{'_in': 'justin', '_out': 'julia', 'type': 'parent'}, | |
{'_in': 'justin', '_out': 'paul', 'type': 'parent'}, | |
{'_in': 'marta', '_out': 'julia', 'type': 'parent'}, | |
{'_in': 'marta', '_out': 'paul', 'type': 'parent'}, | |
{'_in': 'julia', '_out': 'paul', 'type': 'sibling'}, | |
{'_in': 'paul', '_out': 'julia', 'type': 'sibling'}, | |
{'_in': 'jacinta', '_out': 'justin', 'type': 'parent'}, | |
{'_in': 'don', '_out': 'justin', 'type': 'parent'}, | |
{'_in': 'rosario', '_out': 'marta', 'type': 'parent'}, | |
]) | |
# All julias brothers or sisters. | |
query = Query(graph).vector('julia').in_().out().drop(_id='julia').unique().attr('_id') | |
print(query.all()) | |
# All pauls brothers or sisters. | |
query = Query(graph).vector('paul').out(type='sibling').attr('_id') | |
print(query.all()) | |
# Julia and pauls grandparents. | |
query = Query(graph).vector('paul', 'julia').in_(type='parent').unique( | |
).in_(type='parent').attr('_id') | |
print(query.all()) | |
# We should be able to get aliases for free. | |
grandparents = Query().in_(type='parent').unique().in_(type='parent').attr('_id') | |
query = Query(graph).vector('paul', 'julia') | |
print(query.add_pipeline(grandparents).all()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment