Skip to content

Instantly share code, notes, and snippets.

@tapmodo
Last active October 29, 2015 02:50
Show Gist options
  • Save tapmodo/19d0567b3e5bfe95c350 to your computer and use it in GitHub Desktop.
Save tapmodo/19d0567b3e5bfe95c350 to your computer and use it in GitHub Desktop.
This is a first stab at a batch uploader for InfluxDB. It seems to work. If you wanted to use this for any other queue batching operation, you could overload the write() and _send() methods...
var _ = require('lodash');
var debug = require('debug')('influx:batch');
var SeriesWriter = function(db,options){
this.db = db;
this.setOptions(options);
this.sending = false;
this.timer = null;
this.queue = [];
};
SeriesWriter.defaults = {
series: '_internal',
interval: 2000,
chunkSize: 500,
chunkMin: 100,
autoTime: true,
write_opts: { }
};
SeriesWriter.prototype = {
write: function(values,tags){
// Set time if not set and autoTime setting enabled
if (!values.time && this.options.autoTime)
values.time = (new Date()).toISOString();
// Push value onto queue and trigger
this.queue.push([ values, tags ]);
this._trigger();
return this;
},
setOptions: function(options){
this.options = _.assign({},SeriesWriter.defaults,options || {});
return this;
},
canSend: function(){
return (this.queue.length >= this.options.chunkSize) ||
(this.queue.length >= this.options.chunkMin);
},
flush: function(){
return this._trigger(true);
},
_delay: function(flush){
if (this.timer) clearTimeout(this.timer);
this.timer = setTimeout(this._trigger.bind(this,flush),this.options.interval);
},
_trigger: function(flush){
if (!this.queue.length) return;
if (!flush) {
if (this.sending) return this._delay();
if (!this.canSend()) return this._delay(true);
}
if (!flush && (this.sending || !this.canSend())) return this._delay();
var points = _.chunk(this.queue,this.options.chunkSize).shift();
this.queue = _.drop(this.queue,points.length);
this.sending = true;
this._send(points);
},
_send: function(points){
debug('sending %s points',points.length);
var self = this;
var series = this.options.series;
var opts = this.options.write_opts;
this.db.writePoints(series,points,opts,function(err,data){
self.sending = false;
if (err) throw err;
self._trigger();
});
}
};
module.exports = SeriesWriter;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment