Skip to content

Instantly share code, notes, and snippets.

@justinfay
Last active June 21, 2020 06:20
Show Gist options
  • Save justinfay/65f17999f9d015d81d5a3e39d6a13dcd to your computer and use it in GitHub Desktop.
Save justinfay/65f17999f9d015d81d5a3e39d6a13dcd to your computer and use it in GitHub Desktop.
from functools import partial, wraps
pipetypes = {}
def pipetype(func):
pipetypes[func.__name__] = func
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@pipetype
def vector(graph, *ids):
if not ids:
return graph.vertices[:]
for id_ in ids:
try:
yield graph.vertex_index[id_]
except KeyError:
pass
@pipetype
def find(pipeline, *args):
"""
A special pipetype that only ever receives
a send once with the graph.
"""
for item in pipeline:
if not args:
yield item
for key in args:
if key in item:
yield item
break
@pipetype
def nth(pipeline, n):
i = 0
for _ in pipeline:
if i == n:
yield _
return
i += 1
@pipetype
def unique(pipeline):
seen = set()
for _ in pipeline:
if _['_id'] not in seen:
seen.add(_['_id'])
yield _
@pipetype
def filter(pipeline, func):
for _ in pipeline:
if func(_):
yield _
@pipetype
def take(pipeline, n):
i = 0
for _ in pipeline:
yield _
i += 1
if i == n:
return
@pipetype
def in_(pipeline, **kwargs):
for item in pipeline:
for edge in item['_in']:
if kwargs:
for k, v in kwargs.items():
if edge[k] == v:
yield edge['_in']
break
else:
yield edge['_in']
@pipetype
def out(pipeline, **kwargs):
for item in pipeline:
for edge in item['_out']:
if kwargs:
for k, v in kwargs.items():
if edge[k] == v:
yield edge['_out']
break
else:
yield edge['_out']
@pipetype
def attr(pipeline, attr):
for _ in pipeline:
yield _[attr]
@pipetype
def drop(pipeline, **kwargs):
for _ in pipeline:
for k, v in kwargs.items():
if _.get(k) == v:
break
else:
yield _
class Graph:
def __init__(self, vertices=None, edges=None):
self.vertices = []
self.edges = []
self.vertex_index = {}
if vertices is not None:
self.add_vertices(vertices)
if edges is not None:
self.add_edges(edges)
def add_vertices(self, vertices):
for vertice in vertices:
self.add_vertex(vertice)
def add_edges(self, edges):
for edge in edges:
self.add_edge(edge)
def add_vertex(self, vertex):
if '_id' not in vertex:
vertex['_id'] = id(vertex)
elif vertex['_id'] in self.vertex_index:
raise ValueError(
'vertex with ID {} in graph already'.format(vertex['_id']))
vertex.setdefault('_out', [])
vertex.setdefault('_in', [])
self.vertex_index[vertex['_id']] = vertex
self.vertices.append(vertex)
return vertex['_id']
def add_edge(self, edge):
in_ = self.vertex_index[edge['_in']]
out_ = self.vertex_index[edge['_out']]
in_['_out'].append(edge)
out_['_in'].append(edge)
edge['_in'] = in_
edge['_out'] = out_
self.edges.append(edge)
class Query:
"""
Base class for lazy querying.
"""
def __init__(self, graph=None):
self.graph = graph
# A serious of pipetypes that make up the pipeline.
self.pipeline = []
self.steps = []
def __getattr__(self, attr):
"""
Used to dynamically add steps to our query.
Note the filters aren't called here as we need
to maintain laziness. Therefore the steps are
recorded and later when we try to pull a value
through our pipeline that will start generating.
"""
try:
pipetype = pipetypes[attr]
except KeyError:
raise ValueError('Not a valid pipetype "{}"'.format(attr))
return partial(self._add_pipetype, pipetype)
def _add_pipetype(self, pipetype, *args, **kwargs):
self.steps.append((pipetype, args, kwargs))
# Return self to allow chaining style.
return self
def __iter__(self):
self.pipeline = []
last = self.graph
for pipetype, args, kwargs in self.steps:
last = pipetype(last, *args, **kwargs)
self.pipeline.append(last)
return iter(self.pipeline[-1])
def all(self):
return [_ for _ in self]
def add_pipeline(self, pipeline):
"""
pipeline is just another query.
"""
self.steps.extend(pipeline.steps)
return self
if __name__ == "__main__":
justin = {
'_id': 'justin',
}
marta = {
'_id' : 'marta',
}
julia = {
'_id' : 'julia',
}
paul = {
'_id': 'paul',
}
jacinta = {
'_id': 'jacinta'
}
don = {
'_id': 'don'
}
rosario = {
'_id': 'rosario'
}
graph = Graph(vertices=[justin, marta, julia, paul, jacinta, don, rosario])
graph.add_edges([
{'_in': 'justin', '_out': 'julia', 'type': 'parent'},
{'_in': 'justin', '_out': 'paul', 'type': 'parent'},
{'_in': 'marta', '_out': 'julia', 'type': 'parent'},
{'_in': 'marta', '_out': 'paul', 'type': 'parent'},
{'_in': 'julia', '_out': 'paul', 'type': 'sibling'},
{'_in': 'paul', '_out': 'julia', 'type': 'sibling'},
{'_in': 'jacinta', '_out': 'justin', 'type': 'parent'},
{'_in': 'don', '_out': 'justin', 'type': 'parent'},
{'_in': 'rosario', '_out': 'marta', 'type': 'parent'},
])
# All julias brothers or sisters.
query = Query(graph).vector('julia').in_().out().drop(_id='julia').unique().attr('_id')
print(query.all())
# All pauls brothers or sisters.
query = Query(graph).vector('paul').out(type='sibling').attr('_id')
print(query.all())
# Julia and pauls grandparents.
query = Query(graph).vector('paul', 'julia').in_(type='parent').unique(
).in_(type='parent').attr('_id')
print(query.all())
# We should be able to get aliases for free.
grandparents = Query().in_(type='parent').unique().in_(type='parent').attr('_id')
query = Query(graph).vector('paul', 'julia')
print(query.add_pipeline(grandparents).all())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment