Created
November 9, 2014 05:09
-
-
Save brian-gates/8cbaebf8cd65cca57f78 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var oboe = require('oboe'); | |
var Readable = require('stream').Readable; | |
var util = require('util'); | |
var urlParser = require('url'); | |
var normalize = require('./normalize-query-statement'); | |
var request = require('request'); | |
var _ = require('lodash'); | |
util.inherits(CypherStream, Readable); | |
// recursively replace each node with its data property if available | |
function extractData(item) { | |
if (!item) { | |
return item; | |
} | |
if (item.data) { | |
return extractData(item.data); | |
} | |
var isArrayOrObject = ['array', 'object'].indexOf(typeof item) !== -1; | |
if (!isArrayOrObject) { | |
// filter only objects and arrays | |
return item; | |
} | |
// recurse on each property | |
Object.keys(item).forEach(function (key) { | |
item[key] = extractData(item[key]); | |
}); | |
return item; | |
} | |
function CypherStream(databaseUrl, statements, options) { | |
Readable.call(this, { objectMode: true }); | |
statements = normalize(statements).filter(function (statement) { | |
if(statement.commit) { | |
options.commit = true; | |
} | |
if(statement.rollback) { | |
options.rollback = true; | |
} | |
return statement.statement; | |
}); | |
// if a rollback is requested before a transactionId is acquired, we can quit early. | |
if(options.rollback && !options.transactionId) { | |
this.push(null); | |
return this; | |
} | |
var columns; | |
var transactionTimeout; | |
var self = this; | |
var headers = { | |
"X-Stream": true, | |
"Accept": "application/json", | |
}; | |
var parsedUrl = urlParser.parse(databaseUrl); | |
//add HTTP basic auth if needed | |
if (parsedUrl.auth) { | |
headers['Authorization'] = 'Basic ' + | |
new Buffer(parsedUrl.auth).toString('base64'); | |
} | |
if (databaseUrl[databaseUrl.length - 1] !== '/') { | |
databaseUrl += '/'; // ensure trailing slash | |
} | |
var url = databaseUrl+'db/data/transaction'; | |
if (options && options.transactionId) { | |
url += '/'+options.transactionId; | |
} | |
if (options && options.commit) { | |
url += '/commit'; | |
} | |
function transactionExpired () { | |
self.emit('transactionExpired'); | |
} | |
var opt = { | |
url : url, | |
method : options.transactionId && options.rollback ? 'DELETE': 'POST', | |
headers : headers, | |
body : { statements: statements }, | |
json : true, | |
}; | |
request.post(opt, function (err, response, body) { | |
var columns = body.columns; | |
body.results.forEach(function (result) { | |
// var columns = result.columns; | |
var formatted = result.data.map(extractData); | |
result.data.forEach(function (data) { | |
var extracted = data.row.map(extractData); | |
var object = _.zipObject(result.columns, extracted); | |
self.push(object); | |
}) | |
}); | |
self.push(null); | |
}); | |
this._read = function () { }; | |
} | |
module.exports = CypherStream; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment