Skip to content

Instantly share code, notes, and snippets.

@mwatts15
Created October 24, 2015 07:26
Show Gist options
  • Save mwatts15/396f105fb81fe31d1269 to your computer and use it in GitHub Desktop.
Save mwatts15/396f105fb81fe31d1269 to your computer and use it in GitHub Desktop.
from __future__ import print_function
from socket import error as SocketError
import errno
import traceback
import rdflib
import time
import os
import signal
from urllib2 import URLError
from rdflib.plugins.stores.sparqlstore import SPARQLUpdateStore
from SPARQLWrapper.SPARQLExceptions import EndPointInternalError
class TimeoutException(Exception):
pass
def handler(signum, frame):
raise TimeoutException("Took too long")
def genstmts(base, count):
print('genstmts')
stmts = ""
s = base + 's'
p = base + 'p'
o = base + 'o'
for i in range(count):
x = str(i)
stmts = stmts + '<' + s + x + '>' + '<' + p + x + '>' + '<' + o + x + '> . '
return stmts
def genstmts_ba(base, count, max_size=None):
print('genstmts (bytearray)')
if max_size is None:
max_size = count * 100
ba = bytearray(max_size)
s = (base + 's').encode('UTF-8')
p = (base + 'p').encode('UTF-8')
o = (base + 'o').encode('UTF-8')
sz = len(s)
stmt_start = 0
buf_index = 0
stmt_idx = 0
while True:
try:
for i in range(stmt_idx, count):
stmt_idx = i
stmt_start = buf_index
x = str(i).encode('UTF-8')
l = len(x)
ba[buf_index] = '<'
buf_index += 1
ba[buf_index:buf_index + sz] = s
buf_index += sz
ba[buf_index:buf_index + l] = x
buf_index += l
ba[buf_index:buf_index + 2] = '><'
buf_index += 2
ba[buf_index:buf_index + sz] = p
buf_index += sz
ba[buf_index:buf_index + l] = x
buf_index += l
ba[buf_index:buf_index + 2] = '><'
buf_index += 2
ba[buf_index:buf_index + sz] = o
buf_index += sz
ba[buf_index:buf_index + l] = x
buf_index += l
ba[buf_index:buf_index + 4] = '> . '
buf_index += 4
except IndexError:
buf_index = stmt_start
# estimate how much more we need based on how much we've
# used
estimate_on_space_needed = (buf_index // stmt_idx) * (count - stmt_idx)
ba.extend(bytearray(estimate_on_space_needed))
continue
break
return ba[0:buf_index].decode('UTF-8')
def do_update(g, i, stmts):
g.update(stmts)
g.store.setTimeout(TIMEOUT)
g.store.commit()
def setup_graph(endpoint):
store = SPARQLUpdateStore(postAsEncoded=False, autocommit=False)
g = rdflib.ConjunctiveGraph(store)
g.open(endpoint)
return g
def main(args):
if len(args) < 2:
return "Usage: {progname} SPARQL_ENDPOINT".format(progname=args[0])
endpoint = args[1]
g = setup_graph(endpoint)
g.update("DELETE { ?x ?y ?z } WHERE { ?x ?y ?z }")
delays = []
low = 1
hi = 1
i = 1
last = 1
stmts = ""
#signal.signal(signal.SIGALRM, handler)
while True:
if i > hi:
hi = i
stmts = "INSERT DATA { " + genstmts_ba(BASE, i) + " }"
try:
print(i, hi, low)
t0 = time.time()
#signal.alarm(TIMEOUT)
do_update(g, i, stmts)
t1 = time.time()
delays.append(t1 - t0)
low = i
if i == hi:
i = i << 1
else:
i = (i + hi) / 2
except (URLError, EndPointInternalError) as e:
if "time" in str(e) or isinstance(e, EndPointInternalError):
print(".", i, hi, low)
hi = i
i = (hi + low) / 2
# XXX: This exception probably interrupts the with the remote
# connection, so we force opening of a new one
g.close()
g = setup_graph(endpoint)
else:
traceback.print_exc()
break
except EndPointInternalError:
o
except SocketError as e:
if e.errno != errno.ECONNRESET:
raise
g.close()
g = setup_graph(endpoint)
# connection reset errors are (hopefully) transient, so we just
# retry with a new connection
continue
except Exception:
traceback.print_exc()
break
if last == i:
break
last = i
print("max statements = {}".format(i))
print(
"max statement length = {} ({} kb)".format(
len(stmts), len(
stmts.encode('UTF-8')) / 1024.0))
print(delays)
if __name__ == '__main__':
import sys
TIMEOUT = os.environ.get('TIMEOUT', 30)
BASE = os.environ.get('BASE', 'http://e.o/')
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment