Connect RDFLib's SPARQL Store to Amazon Neptune
""" | |
RDFLib SPARQLStore wrapper for use with Neptune | |
""" | |
import sys | |
import os | |
import warnings | |
from rdflib import URIRef, Graph, ConjunctiveGraph | |
from rdflib.plugins.stores.sparqlstore import ( | |
SPARQLStore, | |
SPARQLUpdateStore, | |
NSSPARQLWrapper, | |
) | |
from SPARQLWrapper import Wrapper, POST | |
NEPTUNE_DEFAULT_GRAPH = URIRef('http://aws.amazon.com/neptune/vocab/v01/DefaultNamedGraph') | |
class NeptuneWrapper(NSSPARQLWrapper): | |
def setQuery(self, query): | |
""" | |
Set the SPARQL query text, method as POST only. | |
""" | |
self.setMethod(POST) | |
# Necessary for Neptune to return RDF/XML rather than nquads. | |
# RDFLib only supports RDF/XML results for CONSTRUCT as of 4.2.2 | |
# https://github.com/RDFLib/sparqlwrapper/blob/master/SPARQLWrapper/Wrapper.py#L437 | |
self.onlyConneg = True | |
self.queryType = self._parseQueryType(query) | |
self.queryString = self.injectPrefixes(query) | |
class NeptuneStore(NeptuneWrapper, SPARQLStore): | |
def __init__(self, *args, **kwargs): | |
super(NeptuneWrapper, self).__init__(**kwargs) | |
super(SPARQLStore, self).__init__(**kwargs) | |
class NeptuneUpdateStore(NeptuneStore, SPARQLUpdateStore): | |
def __init__(self, **kwargs): | |
super(NeptuenStore, self).__init__(**kwargs) | |
super(SPARQLUpdateStore, self).__init__(**kwargs) | |
def connect(endpoint): | |
""" | |
Helper for initializing a store. | |
""" | |
store = NeptuneUpdateStore() | |
store.open((endpoint, endpoint)) | |
return store | |
def main(): | |
""" | |
Test functionality by writing and reading some triples. | |
""" | |
endpoint = 'http://your-neptune-endpoint:8182/sparql'] | |
store = NeptuenUpdateStore() | |
store.open((endpoint, endpoint)) | |
# Add triples to the default graph | |
ng = Graph(store, identifier=NEPTUNE_DEFAULT_GRAPH) | |
uri = URIRef("http://sample.org/hi") | |
trip = (uri, RDFS.label, Literal("Statement")) | |
ng.add(trip) | |
print(ng.value(subject=uri, predicate=RDFS.label)) | |
# Remove these | |
ng.remove(trip) | |
# Add triples to a new named graph | |
my_graph = URIRef("http://tmp.namespace.org/entity1") | |
my_ng = Graph(store, identifier=my_graph) | |
my_ng.add(trip) | |
print(ng.value(subject=uri, predicate=RDFS.label)) | |
print(len(my_ng)) | |
store.remove_graph(my_ng) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment