Skip to content

Instantly share code, notes, and snippets.

@chiral
Last active May 4, 2016 12:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chiral/57012c6044e294def637 to your computer and use it in GitHub Desktop.
Save chiral/57012c6044e294def637 to your computer and use it in GitHub Desktop.
RDF generation helper built on rdflib
from rdfhelper import *
AD5 = 'http://adfive.net/o/v1#'
def make_recipe1():
g = GraphEx(AD5)
b = g.BNodeEx()
b.load_csv.a(g.LoadCSV).source(
b.in_file.a(g.DataSource)
).destination(
b.df.a(g.DataFrame).hasColumn(
b.Cost.a(g.DataColumn).colname("Cost"),
b.CV.a(g.DataColumn).colname("CV")
)
)
b.plot.a(g.Plot2d).plotX(b.Cost).plotY(b.CV).plotTo(b.out_pict)
b.R.a(g.RlangProvider).holds(b.load_csv,b.df,b.plot)
g.graph.serialize('owlrdf/recipe1.ttl',format='turtle')
def make_recipe1_for_infer():
g = GraphEx(AD5)
b = g.BNodeEx()
b.load_csv.source(b.in_file).destination(
b.df.hasColumn(
b.Cost.colname("Cost"),
b.CV.colname("CV")
)
)
b.plot.plotX(b.Cost).plotY(b.CV).plotTo(b.out_pict)
b.R.holds(b.load_csv,b.df,b.plot)
g.graph.serialize('owlrdf/recipe1_for_infer.ttl',format='turtle')
if __name__ == '__main__':
make_recipe1()
make_recipe1_for_infer()
from rdflib import URIRef,BNode,Literal,Graph,Namespace
from rdflib.namespace import RDF,RDFS,OWL,XSD,NamespaceManager
class GraphEx:
def __init__(self,NS_URI,**kwargs):
self.graph = Graph()
self.ns = Namespace(NS_URI)
self.graph.namespace_manager.bind('',self.ns)
for prefix in kwargs:
uri = kwargs[prefix]
ns = Namespace(uri)
self.graph.namespace_manager.bind(prefix,ns)
def BNodeEx(self):
return BNodes(self.graph,self.ns)
def __getattr__(self,name):
node = self.ns.__getattr__(name)
return NodeEx(self.graph,self.ns,node)
class BNodes:
def __init__(self,graph,ns):
self.graph = graph
self.ns = ns
self.bnodes = {}
def __getattr__(self,name):
if name not in self.bnodes:
self.bnodes[name] = NodeEx(self.graph,self.ns,BNode())
return self.bnodes[name]
class NodeEx:
def __init__(self,graph,ns,node):
self.graph = graph
self.ns = ns
self.node = node
def a(self,t):
self.graph.add((self.node,RDF.type,t.node))
return self
def __getattr__(self,name):
p = self.ns.__getattr__(name)
return PredicateEx(self,p)
class PredicateEx:
RDFLIBTypes = (Literal,BNode,URIRef)
def __init__(self,subject,predicate):
self.subject = subject
self.predicate = predicate
def __call__(self,*oo):
s = self.subject
p = self.predicate
for o in oo:
if isinstance(o,NodeEx):
s.graph.add((s.node,p,o.node))
elif any(isinstance(o,t) for t in self.RDFLIBTypes):
s.graph.add((s.node,p,o))
elif isinstance(o,basestring):
s.graph.add((s.node,p,Literal(o,datatype=XSD.string)))
else:
raise "unknown object type"
return s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment