Skip to content

Instantly share code, notes, and snippets.

@mommi84
Created February 16, 2020 18:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mommi84/ade8c441a144008cba882fc1e603bead to your computer and use it in GitHub Desktop.
Save mommi84/ade8c441a144008cba882fc1e603bead to your computer and use it in GitHub Desktop.
Generic simple SPARQL interface for Python v3.
#!/usr/bin/env python
import http, json, base64
from urllib.request import Request, urlopen
from urllib.parse import urlencode
from urllib.error import HTTPError
from time import time
class PySparql():
def __init__(this, endpoint, username=None, password=None, buffer_size=10000):
this.buffer_size = buffer_size
this.endpoint = endpoint
this.username = username
this.password = password
this.param = {
# "format": "JSON",
# "CXML_redir_for_subjs": "121",
# "CXML_redir_for_hrefs": "",
# "timeout": "600000",
# "debug": "on",
}
def query(this, query, graph=""):
t0 = time()
res = this._query_all(query, graph)
dt = time() - t0
dt_str = "%f" % dt
# print(u"QUERYTIME\t{}\t{}".format(dt_str, query.encode('utf-8')))
return res
def _query_all(this, query, graph):
results = []
i = 0
while(True):
offset = i * this.buffer_size
part = this._query_buffer("{} OFFSET {} LIMIT {}".format(query, offset, this.buffer_size), graph)
results += part
if len(part) < this.buffer_size:
break
i += 1
return results
def _query_buffer(this, query, graph):
this.param["default-graph-uri"] = graph
this.param["query"] = query.encode('utf-8')
#print(query)
try:
headers = {'Accept': 'application/json'}
if this.username is not None and this.password is not None:
headers['Authorization'] = "Basic {}".format(base64.b64encode("{}:{}".format(this.username, this.password)))
#print(headers)
req = Request(this.endpoint + "?" + urlencode(this.param), None, headers)
resp = urlopen(req)
j = resp.read()
resp.close()
return json.loads(j)['results']['bindings']
except (HTTPError, http.client.BadStatusLine) as error:
j = '{ "results": { "bindings": [] } }'
print(error)
return []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment