|
"use strict"; |
|
|
|
// This code is based* on Comunica's Sparql End Point code |
|
// https://github.com/comunica/comunica/tree/master/packages/actor-init-sparql |
|
// Licensied under the MIT License by Ghent University / Ruben Taelman / Joachim Van Herwegen |
|
// |
|
// *It is mostly a copy and paste |
|
|
|
// This code enables to build a Sparql end point from a RDFJS source |
|
|
|
|
|
if (process.argv.length != 4) { |
|
console.log("This script needs to be called with"); |
|
console.log("node sparqlendpoint.js N3|wasm /path/to/the/dataset"); |
|
return; |
|
} |
|
|
|
const Store = function() { |
|
if (process.argv[2] == "n3") { |
|
return require("n3").Store; |
|
} else { |
|
return require("./wasm_tree/wrappedtree.js").Store; |
|
} |
|
}(); |
|
|
|
const SOURCE_FILE = process.argv[3]; |
|
const PORT = 3001; |
|
const INVALIDATE_CACHE = false; |
|
const TIMEOUT = 60000000; |
|
const newEngine = require('@comunica/actor-init-sparql-rdfjs').newEngine; |
|
|
|
const ttl_read = require('@graphy/content.ttl.read'); |
|
|
|
const fs = require("fs"); |
|
const http = require("http"); |
|
const querystring = require("querystring"); |
|
const url = require("url"); |
|
const asynciterator_1 = require("asynciterator"); |
|
// tslint:disable:no-var-requires |
|
const quad = require('rdf-quad'); |
|
const { exit } = require("process"); |
|
|
|
/** |
|
* An HTTP service that exposes a Comunica engine as a SPARQL endpoint. |
|
*/ |
|
class HttpServiceSparqlEndpoint { |
|
constructor(context) { |
|
this.context = context; |
|
this.timeout = TIMEOUT; |
|
this.engine = newEngine(); |
|
} |
|
/** |
|
* Starts the server |
|
* @param {module:stream.internal.Writable} stdout The output stream to log to. |
|
* @param {module:stream.internal.Writable} stderr The error stream to log errors to. |
|
* @param {string} moduleRootPath The path to the invoking module. |
|
* @param {NodeJS.ProcessEnv} env The process env to get constants from. |
|
* @param {string} defaultConfigPath The path to get the config from if none is defined in the environment. |
|
* @param {(code: number) => void} exit The callback to invoke to stop the script. |
|
* @return {Promise<void>} A promise that resolves when the server has been started. |
|
*/ |
|
static runArgsInProcess(stdout, stderr, exit) { |
|
// allow both files as direct JSON objects for context |
|
const stream = fs.createReadStream(SOURCE_FILE); |
|
let streamOfQuads = stream.pipe(ttl_read()); |
|
const store = new Store(); |
|
let c = 0; |
|
store.import(streamOfQuads) |
|
.on('data', () => { c += 1; } ) |
|
.on('end', () => { |
|
console.error(c); |
|
const context = { sources: [ { type: 'rdfjsSource', value: store } ] } |
|
|
|
return new Promise((resolve) => { |
|
new HttpServiceSparqlEndpoint(context).run(stdout, stderr) |
|
.then(resolve) |
|
.catch((reason) => { |
|
stderr.write(reason); |
|
exit(1); |
|
resolve(); |
|
}); |
|
}); |
|
}); |
|
} |
|
|
|
/** |
|
* Start the HTTP service. |
|
* @param {module:stream.internal.Writable} stdout The output stream to log to. |
|
* @param {module:stream.internal.Writable} stderr The error stream to log errors to. |
|
*/ |
|
async run(stdout, stderr) { |
|
const engine = await this.engine; |
|
// Determine the allowed media types for requests |
|
const mediaTypes = await engine.getResultMediaTypes(); |
|
const variants = []; |
|
for (const type of Object.keys(mediaTypes)) { |
|
variants.push({ type, quality: mediaTypes[type] }); |
|
} |
|
// Start the server |
|
const server = http.createServer(this.handleRequest.bind(this, engine, variants, stdout, stderr)); |
|
server.listen(PORT); |
|
server.setTimeout(2 * this.timeout); // unreliable mechanism, set too high on purpose |
|
stderr.write('Server running on http://localhost:' + PORT + '/\n'); |
|
} |
|
/** |
|
* Handles an HTTP request. |
|
* @param {ActorInitSparql} engine A SPARQL engine. |
|
* @param {{type: string; quality: number}[]} variants Allowed variants. |
|
* @param {module:stream.internal.Writable} stdout Output stream. |
|
* @param {module:stream.internal.Writable} stderr Error output stream. |
|
* @param {module:http.IncomingMessage} request Request object. |
|
* @param {module:http.ServerResponse} response Response object. |
|
*/ |
|
async handleRequest(engine, variants, stdout, stderr, request, response) { |
|
const mediaType = request.headers.accept && request.headers.accept !== '*/*' |
|
? require('negotiate').choose(variants, request)[0].type : null; |
|
// Verify the path |
|
const requestUrl = url.parse(request.url || '', true); |
|
if (requestUrl.pathname !== '/sparql') { |
|
stdout.write('[404] Resource not found\n'); |
|
response.writeHead(404, { 'content-type': HttpServiceSparqlEndpoint.MIME_JSON, 'Access-Control-Allow-Origin': '*' }); |
|
response.end(JSON.stringify({ message: 'Resource not found' })); |
|
return; |
|
} |
|
if (INVALIDATE_CACHE) { |
|
// Invalidate cache |
|
await engine.invalidateHttpCache(); |
|
} |
|
// Parse the query, depending on the HTTP method |
|
let sparql; |
|
switch (request.method) { |
|
case 'POST': |
|
sparql = await this.parseBody(request); |
|
this.writeQueryResult(engine, stdout, stderr, request, response, sparql, mediaType, false); |
|
break; |
|
case 'HEAD': |
|
case 'GET': |
|
sparql = requestUrl.query.query || ''; |
|
this.writeQueryResult(engine, stdout, stderr, request, response, sparql, mediaType, request.method === 'HEAD'); |
|
break; |
|
default: |
|
stdout.write('[405] ' + request.method + ' to ' + requestUrl + '\n'); |
|
response.writeHead(405, { 'content-type': HttpServiceSparqlEndpoint.MIME_JSON, 'Access-Control-Allow-Origin': '*' }); |
|
response.end(JSON.stringify({ message: 'Incorrect HTTP method' })); |
|
} |
|
} |
|
/** |
|
* Writes the result of the given SPARQL query. |
|
* @param {ActorInitSparql} engine A SPARQL engine. |
|
* @param {module:stream.internal.Writable} stdout Output stream. |
|
* @param {module:stream.internal.Writable} stderr Error output stream. |
|
* @param {module:http.IncomingMessage} request Request object. |
|
* @param {module:http.ServerResponse} response Response object. |
|
* @param {string} sparql The SPARQL query string. |
|
* @param {string} mediaType The requested response media type. |
|
* @param {boolean} headOnly If only the header should be written. |
|
*/ |
|
async writeQueryResult(engine, stdout, stderr, request, response, sparql, mediaType, headOnly) { |
|
if (!sparql) { |
|
return this.writeServiceDescription(engine, stdout, stderr, request, response, mediaType, headOnly); |
|
} |
|
let result; |
|
try { |
|
result = await engine.query(sparql, this.context); |
|
} |
|
catch (error) { |
|
stdout.write('[400] Bad request\n'); |
|
response.writeHead(400, { 'content-type': HttpServiceSparqlEndpoint.MIME_PLAIN, 'Access-Control-Allow-Origin': '*' }); |
|
response.end(error.toString()); |
|
return; |
|
} |
|
stdout.write('[200] ' + request.method + ' to ' + request.url + '\n'); |
|
stdout.write(' Requested media type: ' + mediaType + '\n'); |
|
stdout.write(' Received query: ' + sparql + '\n'); |
|
response.writeHead(200, { 'content-type': mediaType, 'Access-Control-Allow-Origin': '*' }); |
|
if (headOnly) { |
|
response.end(); |
|
return; |
|
} |
|
let eventEmitter; |
|
try { |
|
const data = (await engine.resultToString(result, mediaType)).data; |
|
data.on('error', (e) => { |
|
stdout.write('[500] Server error in results: ' + e + ' \n'); |
|
response.end('An internal server error occurred.\n'); |
|
}); |
|
data.pipe(response); |
|
eventEmitter = data; |
|
} |
|
catch (error) { |
|
stdout.write('[400] Bad request, invalid media type\n'); |
|
response.writeHead(400, { 'content-type': HttpServiceSparqlEndpoint.MIME_PLAIN, 'Access-Control-Allow-Origin': '*' }); |
|
response.end('The response for the given query could not be serialized for the requested media type\n'); |
|
} |
|
this.stopResponse(response, eventEmitter); |
|
} |
|
async writeServiceDescription(engine, stdout, stderr, request, response, mediaType, headOnly) { |
|
stdout.write('[200] ' + request.method + ' to ' + request.url + '\n'); |
|
stdout.write(' Requested media type: ' + mediaType + '\n'); |
|
stdout.write(' Received query for service description. ' + '\n'); |
|
response.writeHead(200, { 'content-type': mediaType, 'Access-Control-Allow-Origin': '*' }); |
|
if (headOnly) { |
|
response.end(); |
|
return; |
|
} |
|
const s = request.url; |
|
const sd = 'http://www.w3.org/ns/sparql-service-description#'; |
|
const quads = [ |
|
// Basic metadata |
|
quad(s, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', sd + 'Service'), |
|
quad(s, sd + 'endpoint', '/sparql'), |
|
quad(s, sd + 'url', '/sparql'), |
|
// Features |
|
quad(s, sd + 'feature', sd + 'BasicFederatedQuery'), |
|
quad(s, sd + 'supportedLanguage', sd + 'SPARQL10Query'), |
|
quad(s, sd + 'supportedLanguage', sd + 'SPARQL11Query'), |
|
]; |
|
let eventEmitter; |
|
try { |
|
// Flush results |
|
const data = (await engine.resultToString({ |
|
type: 'quads', |
|
quadStream: new asynciterator_1.ArrayIterator(quads), |
|
}, mediaType)).data; |
|
data.on('error', (e) => { |
|
stdout.write('[500] Server error in results: ' + e + ' \n'); |
|
response.end('An internal server error occurred.\n'); |
|
}); |
|
data.pipe(response); |
|
eventEmitter = data; |
|
} |
|
catch (error) { |
|
stdout.write('[400] Bad request, invalid media type\n'); |
|
response.writeHead(400, { 'content-type': HttpServiceSparqlEndpoint.MIME_PLAIN, 'Access-Control-Allow-Origin': '*' }); |
|
response.end('The response for the given query could not be serialized for the requested media type\n'); |
|
return; |
|
} |
|
this.stopResponse(response, eventEmitter); |
|
} |
|
/** |
|
* Stop after timeout or if the connection is terminated |
|
* @param {module:http.ServerResponse} response Response object. |
|
* @param {NodeJS.ReadableStream} eventEmitter Query result stream. |
|
*/ |
|
stopResponse(response, eventEmitter) { |
|
// Note: socket or response timeouts seemed unreliable, hence the explicit timeout |
|
const killTimeout = setTimeout(killClient, this.timeout); |
|
response.on('close', killClient); |
|
function killClient() { |
|
if (eventEmitter) { |
|
// remove all listeners so we are sure no more write calls are made |
|
eventEmitter.removeAllListeners(); |
|
eventEmitter.emit('end'); |
|
} |
|
try { |
|
response.end(); |
|
} |
|
catch (e) { /* ignore error */ } |
|
clearTimeout(killTimeout); |
|
} |
|
} |
|
/** |
|
* Parses the body of a SPARQL POST request |
|
* @param {module:http.IncomingMessage} request Request object. |
|
* @return {Promise<string>} A promise resolving to a query string. |
|
*/ |
|
parseBody(request) { |
|
return new Promise((resolve, reject) => { |
|
let body = ''; |
|
request.setEncoding('utf8'); |
|
request.on('error', reject); |
|
request.on('data', (chunk) => { body += chunk; }); |
|
request.on('end', () => { |
|
|
|
process.stderr.write(body); |
|
const contentType = request.headers['content-type']; |
|
if (contentType && contentType.indexOf('application/sparql-query') >= 0) { |
|
return resolve(body); |
|
} |
|
else if (contentType && contentType.indexOf('application/x-www-form-urlencoded') >= 0) { |
|
return resolve(querystring.parse(body).query || ''); |
|
} |
|
else { |
|
return resolve(body); |
|
} |
|
}); |
|
}); |
|
} |
|
} |
|
|
|
HttpServiceSparqlEndpoint.MIME_PLAIN = 'text/plain'; |
|
HttpServiceSparqlEndpoint.MIME_JSON = 'application/json'; |
|
// tslint:disable:max-line-length |
|
HttpServiceSparqlEndpoint.HELP_MESSAGE = `comunica-sparql-http exposes a Comunica engine as SPARQL endpoint |
|
|
|
context should be a JSON object or the path to such a JSON file. |
|
|
|
Usage: |
|
comunica-sparql-http context.json [-p port] [-t timeout] [-l log-level] [-i] [--help] |
|
comunica-sparql-http "{ \\"sources\\": [{ \\"type\\": \\"hypermedia\\", \\"value\\" : \\"http://fragments.dbpedia.org/2015/en\\" }]}" [-p port] [-t timeout] [-l log-level] [-i] [--help] |
|
|
|
Options: |
|
-p The HTTP port to run on (default: 3000) |
|
-t The query execution timeout in seconds (default: 60) |
|
-l Sets the log level (e.g., debug, info, warn, ... defaults to warn) |
|
-i A flag that enables cache invalidation before each query execution. |
|
--help print this help message |
|
`; |
|
|
|
HttpServiceSparqlEndpoint.runArgsInProcess(process.stdout, process.stderr, () => process.exit(1)); |
I manually tried to run the following request :
(which was one of the actually executed request by BSBM)
And I put a console.log in the match function of wasm tree store
I built a sparql end point with the script in this gist
I executed the request on http://yasgui.triply.cc/ against http://localhost:3001/sparql
Here is a part of the produced logs :
The call
match(null, rdf.namedNode("http://www.w3.org/2000/01/rdf-schema#label"), null, null)
is performed several times for the same request which may be a reason why using a RDF JS Source is inefficient on Comunica Sparql Engine.