Skip to content

Instantly share code, notes, and snippets.

@ktilcu
Forked from habahut/gist:06a79a201c4c91a58ab4
Last active August 29, 2015 14:01
Show Gist options
  • Save ktilcu/15e8a6c72a9a0ad3288c to your computer and use it in GitHub Desktop.
Save ktilcu/15e8a6c72a9a0ad3288c to your computer and use it in GitHub Desktop.
//
//
///// Dashboard.js
var plinko = require('plinko'),
TPSAggregator = require('TPSAggregator'),
OtherAggragator = require('OtherAggragator');
plinko.listen(TPSAggregator);
plinko.listen(OtherAggragator);
// Plinko.js
!function(){
'use strict';
function Plinko(){
var self = this,
app = require('http').createServer(handler),
io = require('socket.io').listen(app),
fs = require('fs'),
EventEmitter = require('events').EventEmitter,
kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('localhost:2181');
app.listen(9180);
self.aggs = [];
function handler (req, res) {
//no op
}
self.listen = function(Agg) {
// Instantiate
var agger = new Agg(EventEmitter);
// Store
self.aggs.push(agger);
// Setup Socket
self.socket = io
.on('connection', function (socket) {
//no op
});
// Setup Consumer
self.consumer = new Consumer(
client,
[{ topic: agger.topic, partition: 0 }],
{autoCommit: true}
);
consumer.on('message', function(message) {
EventEmitter.emit(agger.label, message);
});
setInterval(function(){
var payload = {};
for (var z = self.aggs.length - 1; z >= 0; z--) {
var ag =self.aggs[z];
payload[ag.label] = ag;
}
self.socket = io.send(JSON.strigify(payload));
},1000);
};
}
exports = Plinko;
}();
// TPSAggregator.js
// The aggregator is the unit for getting something done in this framework. The framework emits an event for each message.
// The aggragator need only listen for that event. The framework also exposes a socket to drop data to. It listens for
// an aggregator to emit an event to 'drop' a message.
exports = module.exports = TPSAggregator;
function TPSAggregator(plinko) {
var self = this;
self.label = "TPS";
self.topic = 'tps';
self.aggregators = {
agg1 : function(message){},
agg2 : function(message){},
lastagg : function(message){}
};
self.init = function(homepage, topicname) {
// any necessary init
};
plinko.on('message', function(message) {
// if you want to do something before message is sent to each of the agg units do it here.
for (var i = self.aggregators.length - 1; i >= 0; i--) {
self.aggregators[i].call(self, message);
}
});
self.init();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment