Skip to content

Instantly share code, notes, and snippets.

@habahut
Created May 9, 2014 01:11
Show Gist options
  • Save habahut/06a79a201c4c91a58ab4 to your computer and use it in GitHub Desktop.
Save habahut/06a79a201c4c91a58ab4 to your computer and use it in GitHub Desktop.
data viz beginnings
My understanding so far:
one object per topic: for example TPS could be called the TPSTracker
Each object will have many little processes it is forwarding data to, so for example in TPS there would be a little process for "# of jobs started", "# of jobs on each TPS server" etc...
the TPSTracker will forward each kafka message it recevies to each of hte little entities, which will maintain their own state on the aggregate totals for their piece, throwing away the rest. So the TPS load piece would receive the entire TPS message from kafka, and only update the state for the load on the TPS servers, ignoring everything else.
Then, we stringify the state of each individual peice and send that to the client, and allow them to put that data into the charts on the clients page.
btw Kyle your idea of not bothering to verify producer output makes a lot more sense when we look at it this way, though I still think we should enforce some regularity :)
var app = require('http').createServer(handler)
, io = require('socket.io').listen(app)
, fs = require('fs')
app.listen(9180);
function handler (req, res) {
fs.readFile(__dirname + '/index.html',
function (err, data) {
if (err) {
res.writeHead(500);
return res.end('Error loading index.html');
}
res.writeHead(200);
res.end(data);
});
}
// ------------- start config --------------
var aggregators = [];
var aggregationInterval = 5000; // 5 seconds
// to set this to listen to a new topic, all you have to do is add the
// javascript code for that topic below, and then call createTopicListener
// with a tracker object
createTopicListener(new tpsTracker('/tps', 'TPS_REPORTS'));
//start aggregation
setInterval(function() {
var count = aggregators.length;
for (var i = 0;i < count;i++) {
aggregators[i].aggregate();
}
}, aggregationInterval);
// ------------- end config ----------------
function createTopicListener (trackerObj) {
//if (typeof(trackerObj) === 'undefined') { console.log('trackerObj is undefined!'); return }
var socketConn = io
.of(trackerObj.homepage)
.on('connection', function (socket) {
trackerObj.welcome(socket);
});
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('localhost:2181'),
consumer = new Consumer(
client,
[
{ topic: trackerObj.topicname, partition: 0 }
],
{
autoCommit: true,
}
);
consumer.on('message', function(message) {
trackerObj.onMessage(message);
});
if (trackerObj.hasAggregator === true) {
aggregators.push(trackerObj);
}
}
// TODO: add inheritance to make adding new stuff easier to manage!
function tpsLoadTracker(homepage, topicname) {
tpsRunningJbosT
var self = this;
self.init = function(homepage, topicname) {
self.homepage = homepage;
self.topicname = topicname;
self.longestTask = '';
self.averageLength = 0;
self.currentJobs = []; // modulo this guy
self.currentJobsMax = 20;
self.currentJobsIndex = 0;
self.allMessages = {};
self.newMessages = {};
self.hasAgregator = true;
}
self.welcome = function(socket) {
// push aggregations to viewer!!!
socket.emit('aggregates','welcome');
}
// need to somehow store the socket stuff here?
self.onMessage = function(message) {
tpsLoad.add(message)
runningJobs.add(message)
// tps
var o = JSON.parse(message);
self.newMessages[o.id] = o;
switch(o.type) {
case "T1":
self.currentJobs += 1;
break;
case "T2":
if (self.allMessages[o.id]) {
// delete that one
self.currentJobs -= 1;
}
break;
default:
// wtf?
break;
}
io.of('/tps').emit('aggregates', message);
}
self.aggregate = function() {
// calculate all agregations here!
// then emit them to the webpages
tps.emit('aggregates', 'hello!');
}
self.init(homepage, topicname);
}
function apiTracker() {
var self =this;
self.aggtotals = {};
event.on('message', function(data){
countAggregate(data, self.aggtotals, 'task_id');
totalsAgg();
}
JSON.stirgify(apiTracker)
{
<---- task_id:"123456", message: sup
obj = match(pass it the above message):
obj.messages.append(above.message)
123546:{
"status": 1,
"messages": [
]
stop:
start:
}
time1: "635469843216584163"
time2:"2341654324132165341"
}
}
}
// this is a comment!!!
// consumer.on -> parse -> aggregate
// -> send relevant data to sockets
// some sort of ongoing timer that fires off aggregate jobs, should by syncronized?
//io.sockets.on('connection', function (socket) {
// socket.on('my other event', function (data) {
// console.log(data);
// });
//});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment