Skip to content

Instantly share code, notes, and snippets.

@FLYBYME
Created June 20, 2014 20:43
Show Gist options
  • Save FLYBYME/895532bd7d6415298624 to your computer and use it in GitHub Desktop.
Save FLYBYME/895532bd7d6415298624 to your computer and use it in GitHub Desktop.
TimeSeries
var _ = require("underscore");
var mongoose = require('mongoose'), Schema = mongoose.Schema, Mixed = mongoose.Schema.Types.Mixed;
var options = {
db : {
native_parser : true
},
server : {
poolSize : 15
}
};
mongoose.connect('mongodb://localhost/data/db', options);
/**
* Module dependencies.
*/
var _ = require("underscore");
var mongoose = require('mongoose'), Schema = mongoose.Schema, Mixed = mongoose.Schema.Types.Mixed;
var fs = require('fs');
var options = {
actor : 0,
interval : 1, // seconds
millisecond : false,
verbose : false,
postProcessImmediately : false,
paths : {
value : {
type : 'number'
},
metadata : {
type : Mixed
}
}
};
var roundDay = function(d) {
var t = new Date(d.getFullYear(), d.getMonth(), d.getDate());
return t;
};
/**
* Schema definition
*/
var TimeSeries = new Schema({
day : {
type : Date,
index : true,
required : true
},
metadata : {
interval : {
type : Number
}
},
group : {
type : String,
required : true,
'default' : 'default'
},
name : String,
token : String,
latest : {
timestamp : {
type : Date
},
value : {
type : Number,
'default' : 0
},
metadata : {
type : Mixed
},
},
createdAt : {
date : {
type : Date,
'default' : Date
},
user : {
type : String
}
},
updatedAt : {
date : {
type : Date
},
user : {
type : String
}
},
statistics : {
i : {
type : Number,
'default' : 0
},
avg : {
type : Number
},
max : {
value : {
type : Number
},
timestamp : {
type : Date
}
},
min : {
value : {
type : Number
},
timestamp : {
type : Date
}
}
},
seconds : [Schema.Types.Mixed]
});
/**
* Post hook.
*/
TimeSeries.pre('save', function(next) {
//console.log(this);
if (this.isNew) {
console.log('saving new..');
this.metadata.interval = options.interval;
this.statistics.i = 1;
if (this.latest) {
this.statistics.min.value = this.latest.value;
this.statistics.min.timestamp = this.latest.timestamp;
this.statistics.max.value = this.latest.value;
this.statistics.max.timestamp = this.latest.timestamp;
this.statistics.avg = this.latest.value;
}
} else {
console.log('updating old..');
}
next();
});
var dataFormat = function(timestamp, value, format, ext) {
switch(format) {
case('[ms,y]'):
return [timestamp.getTime(), value]
case('[x,y]'):
return [timestamp, value]
default:
case('hash'):
return _.extend({
timestamp : timestamp,
value : value
}, ext);
}
};
/**
* Virtual methods
*/
TimeSeries.method('getData', function(interval, format) {
var data = [];
var year = this.day.getFullYear();
var month = this.day.getMonth();
var day = this.day.getDate();
var dateObj = new Date()
if (interval == 'second') {
for (var hour = 0; hour < this.seconds.length; hour++) {
for (var minute = 0; minute < this.seconds[hour].length; minute++) {
for (var second = 0; second < this.seconds[hour][minute].length; second++) {
if (roundDay(dateObj).getTime() == roundDay(this.day).getTime()) {
if (hour == dateObj.getHours() && minute == dateObj.getMinutes() && second == dateObj.getSeconds()) {
return data;
}
}
var timestamp = new Date(year, month, day, hour, minute, second);
var d = this.seconds[hour][minute][second];
data.push([timestamp.getTime(), d ? d.value : d]);
if (roundDay(dateObj).getTime() == roundDay(this.day).getTime()) {
if (hour == dateObj.getHours() && minute == dateObj.getMinutes() && second == dateObj.getSeconds()) {
return data;
}
}
};
};
};
}
if (interval == 'minute') {
for (var hour = 0; hour < this.seconds.length; hour++) {
for (var minute = 0; minute < this.seconds[hour].length; minute++) {
var secondSum = null;
for (var second = 0; second < this.seconds[hour][minute].length; second++) {
var d = this.seconds[hour][minute][second];
if (d) {
if (secondSum == null)
secondSum = 0;
secondSum += d.value;
}
};
var timestamp = new Date(year, month, day, hour, minute, 0);
data.push([timestamp.getTime(), secondSum]);
};
};
}
if (interval == 'hour') {
for (var hour = 0; hour < this.seconds.length; hour++) {
var minuteSum = null;
for (var minute = 0; minute < this.seconds[hour].length; minute++) {
var secondSum = null;
for (var second = 0; second < this.seconds[hour][minute].length; second++) {
var d = this.seconds[hour][minute][second];
if (d) {
if (secondSum == null)
secondSum = 0;
if (minuteSum == null)
minuteSum = 0;
secondSum += d.value;
}
};
if (secondSum != null)
minuteSum += secondSum;
};
var timestamp = new Date(year, month, day, hour, 0, 0);
data.push([timestamp.getTime(), minuteSum]);
};
}
if (interval == 'day') {
var hourSum = null;
for (var hour = 0; hour < this.seconds.length; hour++) {
var minuteSum = null;
for (var minute = 0; minute < this.seconds[hour].length; minute++) {
var secondSum = null;
for (var second = 0; second < this.seconds[hour][minute].length; second++) {
var d = this.seconds[hour][minute][second];
if (d) {
if (secondSum == null)
secondSum = 0;
if (minuteSum == null)
minuteSum = 0;
if (hourSum == null)
hourSum = 0;
secondSum += d.value;
}
};
if (secondSum != null)
minuteSum += secondSum;
};
if (minuteSum != null)
hourSum += minuteSum;
};
var timestamp = new Date(year, month, day, hour, 0, 0);
data.push([timestamp.getTime(), hourSum]);
}
return data;
});
TimeSeries.method('minmax', function(timestamp, value) {
var updates = {}, needToSave = false;
if (_.isNumber(this.statistics.max.value)) {
if (value > this.statistics.max.value) {
updates['statistics.max.timestamp'] = timestamp;
updates['statistics.max.value'] = value;
needToSave = true;
}
} else {
updates['statistics.max.timestamp'] = timestamp;
updates['statistics.max.value'] = value;
needToSave = true;
}
if (_.isNumber(this.statistics.min.value)) {
if (value < this.statistics.min.value) {
updates['statistics.min.timestamp'] = timestamp;
updates['statistics.min.value'] = value;
needToSave = true;
}
} else {
updates['statistics.min.timestamp'] = timestamp;
updates['statistics.min.value'] = value;
needToSave = true;
}
if (needToSave) {
this.set(updates);
this.save(function(error, ok) {
if (error)
console.log(error);
});
}
});
/**
* Static methods
*/
TimeSeries.static('findMax', function(conditions, callback) {
var condition = {
'$and' : [{
'day' : {
$gte : conditions.from
}
}, {
'day' : {
$lte : conditions.to
}
}],
'group' : conditions.group,
'name' : conditions.name
};
//console.log('findMax: '+JSON.stringify(condition));
this.find(condition).limit(1).select('statistics.max').sort({
'statistics.max.value' : -1
}).exec(function(error, doc) {
if (error)
callback(error)
else if (doc.length == 1) {
callback(null, doc[0].statistics.max);
} else
callback(null, NaN);
});
});
TimeSeries.static('findMin', function(conditions, callback) {
var condition = {
'$and' : [{
'day' : {
$gte : conditions.from
}
}, {
'day' : {
$lte : conditions.to
}
}],
'group' : conditions.group,
'name' : conditions.name
};
console.log('findMin: ' + JSON.stringify(condition));
this.find(condition).limit(1).select('statistics.min').sort({
'statistics.min.value' : 1
}).exec(function(error, doc) {
if (error)
callback(error);
else if (doc.length == 1) {
//console.log(doc);
callback(null, doc[0].statistics.min);
} else
callback(null, NaN);
});
});
TimeSeries.static('findData', function(request, callback) {
var condition = {
'$and' : [],
'group' : request.group,
'name' : request.name
};
_.extend(condition, request.condition);
if (!request.to)
request.to = new Date();
if (!request.dir)
request.dir = 1;
if (Object.keys(request.condition).length > 0)
condition['$and'].push(request.condition);
condition['$and'].push({
'day' : {
'$gte' : request.from,
}
});
condition['$and'].push({
'day' : {
'$lte' : request.to,
}
});
if (options.verbose) {
console.log(request);
console.log(JSON.stringify(condition));
}
this.find(condition).sort({
'day' : request.dir
}).exec(function(error, docs) {
if (error) {
callback(error);
} else {
if (options.verbose)
console.log('Doc count: ' + docs.length);
var data = [], i;
docs.forEach(function(doc) {
doc.getData(request.interval, request.format).forEach(function(row) {
data.push(row);
});
});
callback(null, data);
}
});
});
function getInitializer() {
var updates = {};
updates.seconds = [];
for (var i = 0; i < 24; i++) {
updates.seconds[i] = [];
for (var j = 0; j < 60; j++) {
updates.seconds[i][j] = [];
//initialize length
updates.seconds[i][j][59] = null;
//initialize length
}
}
return updates;
}
function getUpdates(timestamp, value, metadata, first) {
var updates = {}
var set = {
value : value
};
updates['seconds.' + timestamp.getHours() + '.' + timestamp.getMinutes() + '.' + timestamp.getSeconds()] = set
//statistics
updates['updatedAt.date'] = new Date();
updates['latest.timestamp'] = timestamp;
updates['latest.value'] = value;
updates['$inc'] = {
'statistics.i' : 1
};
return updates;
}
TimeSeries.method('minmax', function(timestamp, value) {
var updates = {}, needToSave = false;
if (_.isNumber(this.statistics.max.value)) {
if (value > this.statistics.max.value) {
updates['statistics.max.timestamp'] = timestamp;
updates['statistics.max.value'] = value;
needToSave = true;
}
} else {
updates['statistics.max.timestamp'] = timestamp;
updates['statistics.max.value'] = value;
needToSave = true;
}
if (_.isNumber(this.statistics.min.value)) {
if (value < this.statistics.min.value) {
updates['statistics.min.timestamp'] = timestamp;
updates['statistics.min.value'] = value;
needToSave = true;
}
} else {
updates['statistics.min.timestamp'] = timestamp;
updates['statistics.min.value'] = value;
needToSave = true;
}
if (needToSave) {
this.set(updates);
this.save(function(error, ok) {
if (error)
console.log(error);
});
}
});
TimeSeries.method('recalc', function(timestamp, value, cb) {
var updates = {}
var sum = 0, i = 0, hour, min, sec, ms;
if (options.verbose)
console.log('s recalc');
sum = 0;
i = 0;
for (hour in this.seconds) {
if (isNaN(parseInt(hour)))
break;
for (min in this.seconds[hour]) {
if (isNaN(parseInt(min)))
break;
for (sec in this.seconds[hour][min]) {
if (isNaN(parseInt(sec)))
break;
if (!this.seconds[hour][min][sec])
continue;
if (isNaN(parseInt(this.seconds[hour][min][sec].value)))
continue;
sum += this.seconds[hour][min][sec].value;
i++;
}
}
}
if (i <= 0)
i = 1;
if (i > 0) {
if (_.isNumber(sum)) {
updates['statistics.avg'] = sum / i;
}
}
if (options.verbose)
console.log(updates);
this.set(updates);
this.save(cb);
});
TimeSeries.static('recalc', function(timestamp, extraCondition, cb) {
var day = roundDay(timestamp);
var condition = {
'day' : day
};
_.extend(condition, extraCondition);
this.findOne(condition, function(e, doc) {
if (e) {
cb(e);
} else {
doc.recalc(timestamp, doc.latest.value, cb);
}
});
});
TimeSeries.method('push', function(timestamp, value, cb) {
var ts = new Date(timestamp);
var h = ts.getHours(), m = ts.getMinutes(), s = ts.getSeconds();
this.set('daily', value);
this.set('hourly.' + h + '', value);
this.set('minute.' + h + '.' + m, value);
this.set('minute.' + h + '.' + m + '.' + s, value);
this.save(cb);
});
TimeSeries.static('push', function(timestamp, value, metadata, cb) {
var day = roundDay(timestamp);
var condition = {
'day' : day,
group : metadata.group,
name : metadata.name
};
var updates = getUpdates(timestamp, value);
var self = this;
if (options.verbose)
console.log('\nCond: ' + JSON.stringify(condition));
if (options.verbose)
console.log('Upda: ' + JSON.stringify(updates));
this.findOneAndUpdate(condition, updates, function(error, doc) {
if (error) {
if (cb)
cb(error);
} else if (doc) {
//doc.minmax(timestamp, value);
if (cb)
cb(null, doc);
//doc.recalc(timestamp, value);
} else {
//console.log('Create new');
var datainit = getInitializer();
var doc = new self({
day : day,
group : metadata.group,
name : metadata.name
});
doc.set(datainit);
doc.set(updates);
doc.save(cb);
}
});
});
/****
*
*
*
*/
var TimeSeriesModel = function(collection, options) {
var model, schema;
/**
* Methods
*/
/**
* Model initialization
*/
function init(collection, options) {
if (mongoose.connection.modelNames().indexOf(collection) >= 0) {
model = connection.model(collection);
} else {
model = mongoose.model(collection, TimeSeries);
}
}
/**
* Push new value to collection
*/
var push = function push(timestamp, value, metadata, cb) {
model.push(timestamp, value, metadata, cb);
};
/**
* Find data of given period
*/
var findData = function(options, cb) {
model.findData(options, cb);
};
/**
* Find Max value of given period
*/
var findMax = function(options, cb) {
model.findMax(options, cb);
};
/**
* Find Min value of given period
*/
var findMin = function(options, cb) {
model.findMin(options, cb);
};
var getModel = function() {
return model;
};
init(collection, options);
/* Return model api */
return {
push : push,
findData : findData,
findMax : findMax,
findMin : findMin,
model : model
};
};
var ts = TimeSeriesModel('my-token', {
interval : 1
});
var usage = require('usage');
var pid = process.pid;
setInterval(function() {
usage.lookup(pid, {
keepHistory : true
}, function(err, result) {
ts.push(new Date(), result.cpu, {
group : 'usage',
name : 'cpu'
});
ts.push(new Date(), result.memory, {
group : 'usage',
name : 'memory'
});
});
}, 1000);
(function() {
var express = require('express');
var app = module.exports = express.createServer();
app.configure(function() {
app.use(express.bodyParser());
app.use(express.methodOverride());
app.use(app.router);
app.all('*', function(req, res, next) {
res.header("Access-Control-Allow-Origin", "*");
res.header('Access-Control-Allow-Methods', 'GET');
res.header("Access-Control-Allow-Headers", "X-Requested-With");
next();
});
});
app.configure('production', function() {
app.use(express.errorHandler());
});
// Routes
app.get('/', function(req, res) {
ts.findData({
condition : {},
from : new Date(new Date() - 1000 * 60 * 60 * 48),
to : new Date(),
group : 'usage',
name : 'cpu',
interval : req.query.interval || 'second'//day,hour,minute,second
}, function(error, data) {
if (error)
console.log(error);
else
console.log('len: ' + data.length);
res.setHeader('x-count', data.length)
res.send(data)
});
});
app.get('/find', function(req, res) {
ts.model.find({}, function(err, docs) {
res.send(docs)
});
});
app.get('/min', function(req, res) {
ts.findMin({
group : 'usage',
name : 'cpu',
from : new Date(new Date() - 1000 * 60 * 60 * 24),
to : new Date()
}, function(error, data) {
if (error)
console.log(error);
else
res.send(data)
});
});
app.get('/max', function(req, res) {
ts.findMax({
group : 'usage',
name : 'cpu',
from : new Date(new Date() - 1000 * 60 * 60 * 24),
to : new Date()
}, function(error, data) {
if (error)
console.log(error);
else
res.send(data)
});
});
app.get('/minmax', function(req, res) {
ts.findMax({
from : new Date(new Date() - 1000 * 60 * 60 * 24),
to : new Date()
}, function(error, data) {
if (error)
console.log(error);
else
res.send(data)
});
});
app.listen(3001);
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment