Skip to content

Instantly share code, notes, and snippets.

@brian-gates
Created November 9, 2014 05:09
Show Gist options
  • Save brian-gates/8cbaebf8cd65cca57f78 to your computer and use it in GitHub Desktop.
Save brian-gates/8cbaebf8cd65cca57f78 to your computer and use it in GitHub Desktop.
var oboe = require('oboe');
var Readable = require('stream').Readable;
var util = require('util');
var urlParser = require('url');
var normalize = require('./normalize-query-statement');
var request = require('request');
var _ = require('lodash');
util.inherits(CypherStream, Readable);
// recursively replace each node with its data property if available
function extractData(item) {
if (!item) {
return item;
}
if (item.data) {
return extractData(item.data);
}
var isArrayOrObject = ['array', 'object'].indexOf(typeof item) !== -1;
if (!isArrayOrObject) {
// filter only objects and arrays
return item;
}
// recurse on each property
Object.keys(item).forEach(function (key) {
item[key] = extractData(item[key]);
});
return item;
}
function CypherStream(databaseUrl, statements, options) {
Readable.call(this, { objectMode: true });
statements = normalize(statements).filter(function (statement) {
if(statement.commit) {
options.commit = true;
}
if(statement.rollback) {
options.rollback = true;
}
return statement.statement;
});
// if a rollback is requested before a transactionId is acquired, we can quit early.
if(options.rollback && !options.transactionId) {
this.push(null);
return this;
}
var columns;
var transactionTimeout;
var self = this;
var headers = {
"X-Stream": true,
"Accept": "application/json",
};
var parsedUrl = urlParser.parse(databaseUrl);
//add HTTP basic auth if needed
if (parsedUrl.auth) {
headers['Authorization'] = 'Basic ' +
new Buffer(parsedUrl.auth).toString('base64');
}
if (databaseUrl[databaseUrl.length - 1] !== '/') {
databaseUrl += '/'; // ensure trailing slash
}
var url = databaseUrl+'db/data/transaction';
if (options && options.transactionId) {
url += '/'+options.transactionId;
}
if (options && options.commit) {
url += '/commit';
}
function transactionExpired () {
self.emit('transactionExpired');
}
var opt = {
url : url,
method : options.transactionId && options.rollback ? 'DELETE': 'POST',
headers : headers,
body : { statements: statements },
json : true,
};
request.post(opt, function (err, response, body) {
var columns = body.columns;
body.results.forEach(function (result) {
// var columns = result.columns;
var formatted = result.data.map(extractData);
result.data.forEach(function (data) {
var extracted = data.row.map(extractData);
var object = _.zipObject(result.columns, extracted);
self.push(object);
})
});
self.push(null);
});
this._read = function () { };
}
module.exports = CypherStream;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment