Skip to content

Instantly share code, notes, and snippets.

@aucampia
Last active July 16, 2021 19:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aucampia/11fb776325b4a1da053e2f6099895228 to your computer and use it in GitHub Desktop.
Save aucampia/11fb776325b4a1da053e2f6099895228 to your computer and use it in GitHub Desktop.
rdflib-reification
@prefix egi: <http://example.com/instance/> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix schema: <https://schema.org/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
egi:google a schema:Organization ;
schema:employee egi:bob ;
schema:name "google" .
egi:bob a schema:Person ;
schema:name "bob" .
[] a rdf:Statement ;
rdf:object egi:bob ;
rdf:predicate schema:employee ;
rdf:subject egi:google ;
schema:endDate "2020-01-01"^^xsd:date ;
schema:startDate "2019-01-01"^^xsd:date .
import sys
from typing import Set, Tuple
from rdflib import RDF, XSD, Literal, Namespace, SDO # type: ignore[import]
from rdflib.graph import Graph # type: ignore[import]
from rdflib.term import BNode, Node # type: ignore[import]
TripleT = Tuple[Node, Node, Node]
TripleSetT = Set[TripleT]
class RDFStatementHelper:
@classmethod
def from_triple(cls, triple: TripleT) -> Tuple[Node, TripleSetT]:
triples: TripleSetT = set()
statement = BNode()
triples.add((statement, RDF.type, RDF.Statement))
triples.add((statement, RDF.subject, triple[0]))
triples.add((statement, RDF.predicate, triple[1]))
triples.add((statement, RDF.object, triple[2]))
return statement, triples
class GraphHelper:
@classmethod
def add_triples(cls, graph: Graph, triples: TripleSetT) -> Graph:
for triple in triples:
graph.add(triple)
return graph
def test_reification() -> None:
EGI = Namespace("http://example.com/instance/")
EGV = Namespace("http://example.com/vocab/")
graph = Graph()
graph.bind("egi", EGI)
graph.bind("egv", EGV)
graph.bind("schema", SDO)
# define the people
graph.add((EGI["bob"], RDF.type, SDO.Person))
graph.add((EGI["bob"], SDO.name, Literal("bob")))
graph.add((EGI["google"], RDF.type, SDO.Organization))
graph.add((EGI["google"], SDO.name, Literal("google")))
# the triple we want to reifiy
triple = (EGI["google"], SDO.employee, EGI["bob"])
graph.add(triple)
# the RDF statement representation of the triple
triple_statement, triples = RDFStatementHelper.from_triple(triple)
GraphHelper.add_triples(graph, triples)
graph.add(
(
triple_statement,
SDO.startDate,
Literal("2019-01-01", datatype=XSD.date),
)
)
graph.add(
(
triple_statement,
SDO.endDate,
Literal("2020-01-01", datatype=XSD.date),
)
)
graph.serialize(destination=sys.stdout.buffer, format="turtle", encoding="utf8")
if __name__ == "__main__":
test_reification()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment