Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Connect RDFLib's SPARQL Store to Amazon Neptune
"""
RDFLib SPARQLStore wrapper for use with Neptune
"""
import sys
import os
import warnings
from rdflib import URIRef, Graph, ConjunctiveGraph
from rdflib.plugins.stores.sparqlstore import (
SPARQLStore,
SPARQLUpdateStore,
NSSPARQLWrapper,
)
from SPARQLWrapper import Wrapper, POST
NEPTUNE_DEFAULT_GRAPH = URIRef('http://aws.amazon.com/neptune/vocab/v01/DefaultNamedGraph')
class NeptuneWrapper(NSSPARQLWrapper):
def setQuery(self, query):
"""
Set the SPARQL query text, method as POST only.
"""
self.setMethod(POST)
# Necessary for Neptune to return RDF/XML rather than nquads.
# RDFLib only supports RDF/XML results for CONSTRUCT as of 4.2.2
# https://github.com/RDFLib/sparqlwrapper/blob/master/SPARQLWrapper/Wrapper.py#L437
self.onlyConneg = True
self.queryType = self._parseQueryType(query)
self.queryString = self.injectPrefixes(query)
class NeptuneStore(NeptuneWrapper, SPARQLStore):
def __init__(self, *args, **kwargs):
super(NeptuneWrapper, self).__init__(**kwargs)
super(SPARQLStore, self).__init__(**kwargs)
class NeptuneUpdateStore(NeptuneStore, SPARQLUpdateStore):
def __init__(self, **kwargs):
super(NeptuenStore, self).__init__(**kwargs)
super(SPARQLUpdateStore, self).__init__(**kwargs)
def connect(endpoint):
"""
Helper for initializing a store.
"""
store = NeptuneUpdateStore()
store.open((endpoint, endpoint))
return store
def main():
"""
Test functionality by writing and reading some triples.
"""
endpoint = 'http://your-neptune-endpoint:8182/sparql']
store = NeptuenUpdateStore()
store.open((endpoint, endpoint))
# Add triples to the default graph
ng = Graph(store, identifier=NEPTUNE_DEFAULT_GRAPH)
uri = URIRef("http://sample.org/hi")
trip = (uri, RDFS.label, Literal("Statement"))
ng.add(trip)
print(ng.value(subject=uri, predicate=RDFS.label))
# Remove these
ng.remove(trip)
# Add triples to a new named graph
my_graph = URIRef("http://tmp.namespace.org/entity1")
my_ng = Graph(store, identifier=my_graph)
my_ng.add(trip)
print(ng.value(subject=uri, predicate=RDFS.label))
print(len(my_ng))
store.remove_graph(my_ng)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment