Skip to content

Instantly share code, notes, and snippets.

@automenta
Forked from shimondoodkin/distcomponents.js
Last active August 29, 2015 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save automenta/1290581b6bb7ee637af2 to your computer and use it in GitHub Desktop.
Save automenta/1290581b6bb7ee637af2 to your computer and use it in GitHub Desktop.
// this does data collection + inserting , processing and inserting using p2p components self discovery and connection
//
// this is not complete it missing removing of sockets when disconnected and limiting zmq high water limit to something small or and maybe to reconect the same socket on connection loss.// maybe add predictable unique names to sockets of componets so data will reflow by zmq correctly on reconnect. maybe to maintain a starting point to reprocess from and load from database.
//
// to run this you need zmq.js and distcomponents.js and npm insta xxhashjs, and install telepathine module from git and replace in it the file telepathine.js with my file(it makes beleive work as expected)
// made on 24/6/2014 expected to run on node 0.10...
process.on('uncaughtException', function (err)
{
w=true;
console.log('Caught exception2: ' + (err.stack||err));
});
//collect
//inserter listens to data
//calculator
//flat=require('./flat.js');
//require('./db.js');
require('./zmq.js')
zmq_telepathine_start();
if(!global.appid)appid=Math.round(Math.random()*10000)+1;
myapp_announcer_start=function()
{
var component_description={
name:'myapp_announcer',
inputs: {
//'announcer':{'zmqport':zmqport,externalip:zmq_getExternalIp()}
},
//output not neded only needed to detect unconnected fully components which is rare case
outputs: {
// 'dbinserter':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()},
// 'dataprocessor':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()}
}
};
var clients={
//'myapp_dbinserter':
//[
//{ url:'url://fff.com:4500',type:'announcer' }
//]
}
var clients_all=[];
Object.defineProperty(component_description, "clients", { value : clients } );
Object.defineProperty(component_description, "clients_all", { value : clients_all } );
Object.defineProperty(component_description, "connect", { value : function(c){return connect(c)} }); //, enumerable:false is default
Object.defineProperty(component_description, "isconnected", { value : function(c){return isconnected(c)} }); //, enumerable:false is default
var clients=component_description.clients
//var dbinserter_clients=component_description.clients.myapp_dbinserter
function isconnected(other_component_description)
{
var endpoint=other_component_description.inputs[component_description.name];
var myexternalip=zmq_getExternalIp();
var endpointport=endpoint.zmqport.replace(/127.0.0.1|0.0.0.0|\*/,myexternalip==endpoint.externalip?'127.0.0.1':endpoint.externalip);
return clients_all.filter(function(zmqcon){ return zmqcon.last_endpoint==endpointport}).length>0;
}
function connect(other_component_description)
{
if(!(component_description.name in other_component_description.inputs))
{
console.log(new Error("myapp_announcer_start - trying to connect wrong components. "+other_component_description.name+' doesnt have in inputs '+component_description.name+'.').stack," to ","other_component_description=",other_component_description," from " ,'component_description=',component_description)
return
}
//if(isconnected(other_component_description))
//{
// console.log(new Error("myapp_announcer_start - already connected components. "+other_component_description.name+' doesnt have in inputs '+component_description.name+'.').stack," to ","other_component_description=",other_component_description," from " ,'component_description=',component_description)
// return
//}
var endpoint=other_component_description.inputs[component_description.name];
var port=endpoint.zmqport;
var myexternalip=zmq_getExternalIp();
port=port.replace(/127.0.0.1|0.0.0.0|\*/,myexternalip==endpoint.externalip?'127.0.0.1':endpoint.externalip);
var zmq1 = zmqconnect(port);
if(!clients[other_component_description.name]){clients[other_component_description.name]=[];}
clients[other_component_description.name].push(zmq1);
clients_all.push(zmq1);
}
//function sendclients(type,data)
//{
// var ct=clients[type];
// for(var i=0;i<ct.length;i++)
// {
// ct[i].send(data);
// }
//}
function sendclients_dbinserter(data)
{
//console.log('announcer: clients_all.len',clients_all.length);
//var ct=dbinserter_clients;
var ct=clients_all;
for(var i=0;i<ct.length;i++)
{
console.log('announcer: sending data to',ct[i].last_endpoint);
ct[i].send(data);
}
}
var clientid=Math.round(Math.random()*1000)+1;
var dedupsendstate1= {prev_send:null,count_send:0};
if(global.w==undefined)w=false;
// the kind of simple emiter of data to insert
setInterval(function(){/// T
if(w)return;
var d = new Date();
var n = (d.getMinutes()*60)+d.getSeconds();
n=Math.floor(n/3);
sendclients_dbinserter( zmqdedupsend(['time',n],dedupsendstate1,clientid) )
},3000);
zmq_telepathine_addcomponent(component_description);
return component_description;
}
myapp_dbinserter_start=function()
{
var component_description={
name:'myapp_dbinserter',
inputs: {
'myapp_announcer':{'zmqport':zmqport,externalip:zmq_getExternalIp()} // myapp_announcer can connect to this port
,'myapp_processor':{'zmqport':zmqport,externalip:zmq_getExternalIp()} // myapp_processor can connect to this port
},
//output not neded only needed to detect unconnected fully components which is rare case
outputs: {
// 'myapp_dbinserter':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()},
// 'dataprocessor':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()}
}
};
var clients= {
//'myapp_dbinserter':
//[
//{ url:'url://fff.com:4500',type:'announcer' }
//]
}
var clients_all=[];
Object.defineProperty(component_description, "clients", { value : clients } );
Object.defineProperty(component_description, "clients_all", { value : clients_all } );
Object.defineProperty(component_description, "servers", { value : []} );
Object.defineProperty(component_description, "connect", { value : function(c){return false; return connect(c)} , enumerable:false });
Object.defineProperty(component_description, "isconnected", { value : function(c){return false; isconnected(c)} }); //, enumerable:false is default
//accept myapp_announcer:
var dbinserterport = 'tcp://*:0';
var zmqs_myapp_log=zmqlisten(dbinserterport,'dbinserter');
component_description.servers.push(zmqs_myapp_log)
component_description.inputs.myapp_announcer.zmqport=zmqs_myapp_log.last_endpoint;
console.log("setting:",'last_component:'+component_description.name+'.myapp_announcer.peer',zmq_telepathine_self.peer_name)
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_announcer',zmqs_myapp_log.last_endpoint);
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_announcer.peer',zmq_telepathine_self.peer_name);
var receivestate1={emitedh:[],emitedt:[],emitedd:[]}
zmqs_myapp_log.on("message", function(str)
{
var re=dedupreceive(str.toString(),receivestate1);
if(re!==undefined)
{
var masterpeername= zmq_telepathine.get('last_component:'+component_description.name+'.myapp_announcer.peer');
if( !( zmq_telepathine.peers[ masterpeername ].alive || masterpeername==zmq_telepathine_self.peer_name ) )
{
console.log('peer dead back to us1');
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_announcer',zmqs_myapp_log.last_endpoint);
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_announcer.peer',zmq_telepathine_self.peer_name);
console.log("setting:",'last_component:'+component_description.name+'.myapp_announcer.peer',zmq_telepathine_self.peer_name);
}
if( zmq_telepathine.get('last_component:'+component_description.name+'.myapp_announcer') !=zmqs_myapp_log.last_endpoint) {console.log('i am not master1'); return;}
//var x=flat.flat(re);x['app']=appid;//commented out to remove flat dependency for demonstaration
x=re
//console.log('dbinsert');
console.log('dbinsert','public.myapp_log',x);
//dbinsert('public.myapp_log',x);
}
});
//accept myapp_processor:
var dbinserterport2 = 'tcp://*:0';
var zmqs_insert=zmqlisten(dbinserterport2);
component_description.servers.push(zmqs_insert)
component_description.inputs.myapp_processor.zmqport=zmqs_insert.last_endpoint;
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_processor',zmqs_insert.last_endpoint);
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_processor.peer',zmq_telepathine_self.peer_name);
var receivestate2={emitedh:[],emitedt:[],emitedd:[]}
zmqs_insert.on("message", function(str)
{
var re=dedupreceive(str.toString(),receivestate2);
if(re!==undefined)
{
var masterpeername= zmq_telepathine.get('last_component:'+component_description.name+'.myapp_processor.peer');
if( !(zmq_telepathine.peers[ masterpeername ].alive || masterpeername==zmq_telepathine_self.peer_name ) )
{
console.log('peer dead back to us2');
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_processor',zmqs_insert.last_endpoint);
zmq_telepathine.set('last_component:'+component_description.name+'.myapp_processor.peer',zmq_telepathine_self.peer_name);
}
if( zmq_telepathine.get('last_component:'+component_description.name+'.myapp_processor') !=zmqs_insert.last_endpoint) {console.log('i am not master2'); return;}
//var x=flat.flat(re);x['app']=appid;
//console.log('dbinsert2');
console.log('dbinsert2',re[0],re[1]);
//dbinsert('public.myapp_log',x);
}
});
zmq_telepathine_addcomponent(component_description);
return component_description;
}
myapp_processor_start=function()
{
var component_description={
name:'myapp_processor',
inputs: {
'myapp_announcer':{'zmqport':zmqport,externalip:zmq_getExternalIp()} // myapp_announcer can connect to this port
//,'myapp_processor':{'zmqport':zmqport,externalip:zmq_getExternalIp()} // myapp_processor can connect to this port
},
//output not neded only needed to detect unconnected fully components which is rare case
outputs: {
// 'myapp_dbinserter':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()},
}
}
var clients= {
'myapp_dbinserter':
[
//{ url:'url://fff.com:4500',type:'announcer' }
]
}
var clients_all=[];
Object.defineProperty(component_description, "clients", { value : clients } );
Object.defineProperty(component_description, "clients_all", { value : clients_all } );
Object.defineProperty(component_description, "connect", { value : function(c){return connect(c)} , enumerable:false });
Object.defineProperty(component_description, "isconnected", { value : function(c){return isconnected(c)} }); //, enumerable:false is default
var clients=component_description.clients
var dbinserter_clients=component_description.clients.myapp_dbinserter
function isconnected(other_component_description)
{
var endpoint=other_component_description.inputs[component_description.name];
var myexternalip=zmq_getExternalIp();
var endpointport=endpoint.zmqport.replace(/127.0.0.1|0.0.0.0|\*/,myexternalip==endpoint.externalip?'127.0.0.1':endpoint.externalip);
return clients_all.filter(function(zmqcon){ return zmqcon.last_endpoint==endpointport}).length>0;
}
function on_zmq_client_message(zmq_client,message)
{
console.log('on_zmq_client_message(zmq_client,message) '+message.toString());
}
function connect(other_component_description)
{
if(!(component_description.name in other_component_description.inputs))
{
console.log(new Error("myapp_announcer_start - trying to connect wrong components. "+other_component_description.name+' doesnt have in inputs '+component_description.name+'.').stack," to ","other_component_description=",other_component_description," from " ,'component_description=',component_description)
return
}
//if(isconnected(other_component_description))
//{
// console.log(new Error("myapp_announcer_start - already connected components. "+other_component_description.name+' doesnt have in inputs '+component_description.name+'.').stack," to ","other_component_description=",other_component_description," from " ,'component_description=',component_description)
// return
//}
var endpoint=other_component_description.inputs[component_description.name];
var port=endpoint.zmqport;
var myexternalip=zmq_getExternalIp();
port=port.replace(/127.0.0.1|0.0.0.0|\*/,myexternalip==endpoint.externalip?'127.0.0.1':endpoint.externalip);
var zmq1 = zmqconnect(port);
if(!clients[other_component_description.name]){clients[other_component_description.name]=[];}
clients[other_component_description.name].push(zmq1);
clients_all.push(zmq1);
}
//function sendclients(type,data)
//{
// var ct=clients[type];
// for(var i=0;i<ct.length;i++)
// {
// ct[i].send(data);
// }
//}
function sendclients_dbinserter(data)
{
var ct=dbinserter_clients;
for(var i=0;i<ct.length;i++)
{
ct[i].send(data);
}
}
var clientid=Math.round(Math.random()*1000)+1;
var dedupsendstate1= {prev_send:null,count_send:0};
function ex_dbinsert(t,d)
{
sendclients_dbinserter( zmqdedupsend([t,d],dedupsendstate1,clientid) )
//sendclients_dbinserter([t,d])
}
var dbinserterport = 'tcp://*:0';
var zmqs_myapp_log=zmqlisten(dbinserterport);
component_description.inputs.myapp_announcer.zmqport=zmqs_myapp_log.last_endpoint;
var receivestate1={emitedh:[],emitedt:[],emitedd:[]}
zmqs_myapp_log.on("message", function(str)
{
console.log('myapp processor: received data');
var re=dedupreceive(str.toString(),receivestate1);
if(re!==undefined)
{
//if(re[0]=='data') myapp_extract.process(re[1]); /// T
if(re[0]=='time') ex_dbinsert('public.time',re[1]); /// T
console.log('myapp processor: data=',re);
//var x=flat.flat(re);x['app']=appid;
//console.log('dbinsert','public.myapp_log',x);
//dbinsert('public.myapp_log',x);
}
});
zmq_telepathine_addcomponent(component_description);
return component_description;
}
zmq_telepathine_start=function ()
{
setTimeout(function(){
myapp_announcer_start();
myapp_dbinserter_start();
myapp_processor_start();
},7000); // time out to be able to set w=true to stop it from emiting announces to do debug work, not requiered
}
var repl = require("repl");repl.start({ useGlobal:true, useColors:true, });
var
PeerState = require('./peer_state').PeerState,
Scuttle = require('./scuttle').Scuttle,
EventEmitter = require('eventemitter2').EventEmitter2,
net = require('net'),
util = require('util'),
child_process = require('child_process'),
dns = require('dns'),
debug = require('debug')('telepathine'),
dgram = require('dgram'),
_ = require('lodash'),
ipcompress = require('./ip');
//Network Configuration - should be true for all peers
var defaultUdp = true;
var defaultGossipIntervalMS = 2500;
var defaultHeartbeatIntervalMS = 2500;
var udpMaxMessageSize = 575; //1400 MTU limit
var eventDefaultTTL = 2500 * 8; //how long events remain active for
/*
Default Options:
options = {
// For IPv4 use [a.b.c.d]:port, ex: 192.168.0.100:1234
// For IPv6 use the format [ad:dre::ss]:port, ex: [::1]:9000
address: '127.0.0.1', // localhost
// Whether to emit value change events on heartbeats
emitValueOnHeartBeat: false,
// Manual Network address translation
addressMap: {
//key: value //key = address mapped from, value = address mapped to
},
// Network ID, used to encrypt messages, secured from non-network message. undefined=public, no encryption
network: "Preshared_Network_Key",
udp: true, //whether to run UDP server (recommended)
gossipIntervalMS: 2500, //how often (ms) to send gossip updates
heartbeatIntervalMS: 2500 //how often (ms) to send heartbeat updates
};
*/
var Telepathine = exports.Telepathine = function (port, seeds, options) {
var self = this;
EventEmitter.call(this, { wildcard: true, delimiter: ':' });
if (!options) options = { };
if (typeof port !== 'number' || port === 0)
throw new Error('must specify a port');
this.public = true; //true = allow WAN connections, false = only LAN
this.peers = {};
this.address = options.address || '127.0.0.1';
this.addressMap = options.addressMap || { };
this.network = options.network;
if (this.network) {
var crypto = require('crypto');
var alg = 'aes192';
var key = crypto.createHash('sha256').update(this.network).digest();
this.networkEncipher = function(input) {
var c = crypto.createCipher(alg, key);
var a1 = c.update(input, 'utf8');
var a2 = c.final();
var b = Buffer.concat([a1, a2]);
return b;
}
this.networkDecipher = function(input) {
var c = crypto.createDecipher(alg, key);
var a1 = c.update(input, 'binary', 'utf8');
var a2 = c.final('utf8');
return a1 + a2;
}
}
this.port = port;
this.seeds = seeds || [];
this.my_state = new PeerState(this.port, this.address);
this.peer_name = net.isIPv6(this.address) ? ['[' + this.address + ']', this.port.toString()].join(':') : [this.address, this.port.toString()].join(':');
this.listenToExpiredKeys(this.my_state)
this.localEventNumber = 0; //should this be Date.now() and increment by milliseconds? this way if a peer gets restarted, it should have a unique set of events in the system, assuming they live longer than the reconnect interval
this.localEventPrefix = ipcompress.IPStringToB64(this.address+':'+this.port);
this.beatHeart = true;
this.emitValueOnHeartBeat = options.emitValueOnHeartBeat || false;
this.gossipIntervalMS = options.gossipIntervalMS || defaultGossipIntervalMS;
this.heartbeatIntervalMS = options.heartbeatIntervalMS || defaultHeartbeatIntervalMS;
this.udp = options.udp || defaultUdp;
this.scuttle = new Scuttle(this.peers);
};
util.inherits(Telepathine, EventEmitter);
var fs = require('./fs');
exports.FileInput = fs.FileInput;
exports.FileOutput = fs.FileOutput;
exports.FileSync = fs.FileSync;
Telepathine.prototype.start = function (callback) {
var self = this;
//TCP server
this.server = net.createServer(function (socket) {
var msgSize = 0;
var msgPosition = 0;
var message = null;
socket.on('data', function (m) {
if (message == null) {
//get the message size from first 4 bytes of m
msgSize = m.readUInt32BE(0) ;
message = new Buffer(msgSize);
m.copy(message, 0, 4); //skip the size bytes
msgPosition += m.length - 4;
}
else {
m.copy(message, msgPosition, 0, msgSize - msgPosition);
msgPosition += m.length;
//TODO what if a new message begins in this buffer?
}
if (msgPosition == msgSize) {
self.handleMessage(socket, message);
message = null;
msgSize = msgPosition = 0;
}
});
socket.on('error', function (e) {
if (debug.enabled) debug('%s => %s error: %s', socket.remoteAddress + socket.remotePort, e);
})
});
function start() {
if (debug.enabled) debug('%s TCP start', self.peer_name);
if (callback)
callback(self)
self.running = true;
self.emit('start', self)
}
// Bind to ip/port
if (this.address) {
this.my_state.address = this.address;
this.my_state.port = this.port;
this.peers[this.peer_name] = this.my_state;
this.server.listen(this.port, (self.public ? null : this.address), start);
} else {
// this is an ugly hack to get the hostname of the local machine
// we don't listen on any ip because it's important that we listen
// on the same ip that the server identifies itself as
child_process.exec('hostname', function (error, stdout, stderr) {
var l = stdout.length;
var hostname = stdout.slice(0, l - 1);
dns.lookup(hostname, 4, function (err, address, family) {
self.address = address;
self.my_state.address = self.address;
self.my_state.port = self.port;
self.peers[self.peer_name] = self.my_state;
self.server.listen(self.port, (self.public ? null : address), start);
});
});
}
if (this.udp) {
var udpServer = this.udpServer = dgram.createSocket("udp4");
// Listen for message events on the socket.
udpServer.on("message", function (message, r /* request info */ ) {
self.handleMessage(udpServer, message, {
address: r.address,
port: r.port
});
});
udpServer.on("error", function (error) {
if (debug.enabled) debug('%s UDP error', self.peer_name, error);
});
// When the socket is configured and ready to receive data
udpServer.on("listening", function () {
//var address = socket.address();
//console.log( "socket listening " + address.address + ":" + address.port );
if (debug.enabled) debug('%s UDP start', self.peer_name);
});
udpServer.bind(this.port);
}
for (var i = 0; i < this.seeds.length; i++) {
if (this.seeds[i] === this.peer_name)
throw new Error('cannot specify self as seed')
}
// another ugly hack :(
var seeds = {};
for (var i = 0; i < this.seeds.length; i++)
seeds[this.seeds[i]] = undefined;
this.handleNewPeers(seeds);
if (this.beatHeart)
this.heartBeatTimer = setInterval(function () {
self.my_state.beatHeart()
}, this.heartbeatIntervalMS);
this.gossipNow();
this.on('set', function (peer, k, v) {
if (k.indexOf('say:')===0) {
//TODO avoid reprocessing events
k = k.split(':');
var whichEvent = k[1];
self.emit('say:' + whichEvent, v, peer);
}
});
return this;
}
/* force immediate broadcast and reset gossip timer
todo: optional parameter to skip immediate gossip if next gossip is below a duration threshold
*/
Telepathine.prototype.gossipNow = function() {
if (this.gossipTimer)
clearInterval(this.gossipTimer);
var self = this;
this.gossip();
this.gossipTimer = setInterval(function () {
self.gossip()
}, this.gossipIntervalMS);
};
Telepathine.prototype.stop = function () {
try{this.server.close();
}catch(e){}
if (this.udpServer)
try{
this.udpServer.close();
}catch(e){}
clearInterval(this.heartBeatTimer);
clearInterval(this.gossipTimer);
this.running = false;
var self = this;
this.emit('stop', self)
};
/* distribute an event by setting a local key with the 'say:' prefix.
ttl: relative time in milliseconds that the event will be active in the network
buffered: true=batch the event with the next update, false=send immediately (default)
*/
Telepathine.prototype.say = function (event, data, ttl /* in milliseconds*/, buffered) {
//TODO add anonymous parameter?
if (!ttl)
ttl = eventDefaultTTL;
var eventID = this.localEventPrefix + '_' + this.localEventNumber++;
var eventname = 'say:' + event + ':' + eventID;
this.set(eventname, data, Date.now() + ttl);
if (!buffered)
this.gossipNow();
};
Telepathine.prototype.hear = function (event, callback) {
this.on('say:' + event, function(value, peer) {
this.event = this.event.split(':')[1];
callback.apply(this, [value, peer]);
});
};
Telepathine.prototype.hearOnce = function (event, callback) {
this.once('say:' + event, function(value, peer) {
this.event = this.event.split(':')[1];
callback.apply(this, [value, peer]);
});
};
Telepathine.prototype.know = function (key, callback) {
this.on('set:' + key, function(peer, key, value, ttl) {
this.event = this.event.split(':')[1];
if (callback)
callback.apply(this, [peer, key, value, ttl]);
});
};
function handle_believe(peer, key, value, ttl, that, callback) {
console.log('1***believe',peer, key, value, ttl)
if (peer!=that.peer_name) {
if(this.event.substring(0,8)=='set:say:')return; // don't beleive say: events(creates unwanted recursion) when doing a.believe(*,fn)
console.log('4***believe',this.event,peer, key, value, ttl)
if (!_.isEqual(that.get(key), value)) {
that.set(key, value, ttl);
if (callback) {
callback.apply(this, [peer, key, value, ttl]);
}
}
}
}
Telepathine.prototype.believe = function (key, callback) {
console.log('believe - set ',key)
var that = this;
this.on('set:' + key, function(peer, key, value, ttl){handle_believe.call(this,peer, key, value, ttl, that, callback)} );
};
Telepathine.prototype.after = function (delayMS, f) {
var self = this;
function a() { setTimeout(f.bind(self), delayMS); }
if (this.running) a();
else this.once('start', a);
return this;
};
Telepathine.prototype.every = function (intervalMS, f) {
var self = this;
function a() { setInterval(f.bind(self), delayMS); }
if (this.running) a();
else this.once('start', a);
};
// The method of choosing which peer(s) to gossip to is borrowed from Cassandra.
// They seemed to have worked out all of the edge cases
// http://wiki.apache.org/cassandra/ArchitectureGossip
Telepathine.prototype.gossip = function () {
//this.emit('gossip start');
var now = Date.now();
for (var p in this.peers)
this.peers[p].expireLocalKeys(now);
var livePeers = this.livePeers();
// Find a live peer to gossip to
var livePeer;
if (livePeers.length > 0) {
livePeer = this.chooseRandom(livePeers);
this.gossipToPeer(livePeer);
}
var deadPeers = this.deadPeers();
// Possilby gossip to a dead peer
var prob = deadPeers.length / (livePeers.length + 1)
if (Math.random() < prob) {
var deadPeer = this.chooseRandom(deadPeers);
this.gossipToPeer(deadPeer);
}
//TODO this following comment is from the original fork, i dont understand
//why it says "gossip to seed" but chooses a peer from all the peers
// Gossip to seed under certain conditions
if (livePeer && !this.seeds[livePeer] && livePeers.length < this.seeds.length) {
if (Math.random() < (this.seeds / this.peers.length)) {
var p = this.chooseRandom(this.allPeers())
this.gossipToPeer(p);
}
}
// Check health of peers
for (var i in this.peers) {
var peer = this.peers[i];
if (peer !== this.my_state) {
peer.isSuspect();
}
}
};
Telepathine.prototype.chooseRandom = function (peers) {
// Choose random peer to gossip to
var i = Math.floor(Math.random() * 1000000) % peers.length;
return this.peers[peers[i]];
};
Telepathine.prototype.respondTCP = function (m, socket) {
var b;
if (!Buffer.isBuffer(m)) {
var mjson = JSON.stringify(m);
if (this.networkEncipher)
b = this.networkEncipher(new Buffer(mjson, 'utf8'));
else
b = new Buffer(mjson, "utf8");
}
else {
b = m;
}
var msgSize = new Buffer(4);
msgSize.writeUInt32BE(b.length, 0);
socket.write(msgSize);
socket.write(b);
};
//attempt to send a UDP packet, but if the message is too large, use TCP
Telepathine.prototype.sendMessage = function (m, address, port) {
var self = this;
var mjson = JSON.stringify(m);
//console.log('send ', mjson);
//TODO unify encipher with the TCP method
var b;
if (this.networkEncipher)
b = this.networkEncipher(new Buffer(mjson, 'utf8'));
else
b = new Buffer(mjson, "utf8");
if ((this.udp) && (b.length < udpMaxMessageSize)) {
this.udpServer.send(
b,
0, // Buffer offset
b.length,
port,
address,
function (error, byteLength) {
if (debug.enabled) debug('gossip:udp %s => %s, type %s, %s bytes', self.peer_name, (address + ':' + port), m.t, b.length)
}
);
return;
}
var gosipeeSocket = new net.createConnection(port, address);
/*gosipeeSocket.on('data', function (msg) {
if (debug.enabled) debug('gossip:tcp:data %s => %s, type %s, %s bytes', (address + ':' + port), self.peer_name, (msg.t + ''), JSON.stringify(msg).length)
self.handleMessage(gosipeeSocket, msg);
});*/
// when we are connected, send a request message
gosipeeSocket.on('connect', function () {
if (debug.enabled) debug('gossip:tcp:connect %s => %s, type %s, (%s bytes sent)', self.peer_name, address+':'+port, m.t, b.length)
self.respondTCP(b, gosipeeSocket);
});
gosipeeSocket.on('error', function (exception) {
if (debug.enabled) debug('gossip:tcp:error %s => %s : %s', self.peer_name, address+':'+port, exception);
});
gosipeeSocket.on('close', function () {
if (debug.enabled) debug('gossip:tcp:close %s => %s', self.peer_name, address+':'+port)
});
}
Telepathine.prototype.resolveAddress = function(a) {
var m = this.addressMap[a];
if (m) return m;
return a;
}
//TODO use an 'initialMessage' parameter (defaulting to self.requestMessage())' allowing TCP connect at any point in the protocol
Telepathine.prototype.gossipToPeer = function (peer) {
var resolvedPeerAddress = this.resolveAddress(peer.address);
if ( (this.port == peer.port) && ( (this.address == peer.address) || (resolvedPeerAddress == this.address)) ) {
//console.error('gossiping to self');
return;
}
if (debug.enabled) debug('gossip %s %s => %s %s', this.address, this.port, resolvedPeerAddress, peer.port)
this.sendMessage(this.requestMessage(), peer.address, peer.port);
}
Telepathine.REQUEST = 0;
Telepathine.FIRST_RESPONSE = 1;
Telepathine.SECOND_RESPONSE = 2;
Telepathine.prototype.handleMessage = function (socket, msg, fromPeer) {
var self = this;
if (self.networkDecipher) {
msg = (socket == this.udpServer) ? msg : new Buffer(msg, 'binary');
try {
msg = self.networkDecipher(msg);
}
catch (e) {
if (debug.enabled) {
if (socket == this.udpServer)
debug('%s => %s bad UDP message', fromPeer.address+':'+fromPeer.port, self.peer_name);
else
debug('%s => %s bad TCP message', socket.remoteAddress + socket.remotePort, self.peer_name);
}
return;
}
}
try {
msg = JSON.parse(msg.toString('utf8'));
}
catch (e) {
if (debug.enabled) {
debug("invalid packet (" + msg.length + ' bytes)');
}
}
switch (msg.t) {
case Telepathine.REQUEST:
var msg = this.firstResponseMessage(msg.d);
if (socket == this.udpServer) {
this.sendMessage(msg, fromPeer.address, fromPeer.port);
} else {
this.respondTCP(msg, socket);
}
break;
case Telepathine.FIRST_RESPONSE:
if (msg.u)
this.scuttle.updateKnownState(msg.u);
var msg = this.secondResponseMessage(msg.r);
if (socket == this.udpServer) {
this.sendMessage(msg, fromPeer.address, fromPeer.port);
} else {
this.respondTCP(msg, socket);
socket.end();
}
break;
case Telepathine.SECOND_RESPONSE:
this.scuttle.updateKnownState(msg.u);
if (socket == this.udpServer) {
//..
} else {
socket.end();
}
break;
default:
// something went bad
break;
}
}
// MESSSAGES
Telepathine.prototype.handleNewPeers = function (newPeers) {
var self = this;
for (var p in newPeers) {
var peer_info;
// TODO can this be done without regex?
var m = p.match(/\[(.+)\]:([0-9]+)/);
var address;
var port;
if (m) {
address = m[1];
port = m[2];
} else {
m = p.split(':');
address = m[0];
port = m[1];
}
var resolvedAddress = this.resolveAddress(address || '127.0.0.1');
var resolvedName = resolvedAddress + ':' + port;
if (this.peer_name == resolvedName) {
//trying to add self, skip
continue;
}
var tp = new PeerState(parseInt(port), resolvedAddress);
tp.name = resolvedName;
tp.metadata = newPeers[p]
this.peers[tp.name] = tp;
this.emit('peer:new', tp);
this.listenToPeer(tp);
}
}
Telepathine.prototype.listenToPeer = function (peer) {
var self = this;
if (peer.name === this.peer_name) {
//throw new Error('cannot listen to itself')
return;
}
var peerName = peer.name;
this.listenToExpiredKeys(peer)
peer.on('update', function (k, v, ttl) {
console.log("peer.on('update', function (",peerName, k, v, ttl)
// heartbeats are disabled by default but it can be changed so this takes care of that
if ((k !== PeerState.heartbeat) || (self.emitValueOnHeartBeat)) {
if (k !== PeerState.heartbeat)
{
console.log("2peer.on('update', function (",peerName, k, v, ttl)
var x={event:'set:' + k}; x.__proto__=self;
handle_believe.call(x,peerName, k, v, ttl, self, function(){console.log('3just beleived',arguments)})
}
self.emit('set', peerName, k, v, ttl);
self.emit('set:' + k, peerName, k, v, ttl);
}
});
peer.on('peer_alive', function () {
self.emit('peer:start', peerName);
});
peer.on('peer_failed', function () {
self.emit('peer:stop', peerName);
});
}
Telepathine.prototype.listenToExpiredKeys = function (peer) {
var self = this;
peer.on('expire', function (k, v, ttl) {
self.emit('key:expire', self.my_state.name, k, v, ttl);
});
}
Telepathine.prototype.requestMessage = function () {
var m = {
t: Telepathine.REQUEST, //type
d: this.scuttle.digest() //digest
};
return m;
};
Telepathine.prototype.firstResponseMessage = function (peer_digest) {
var sc = this.scuttle.scuttle(peer_digest)
this.handleNewPeers(sc.new_peers)
var m = {
t: Telepathine.FIRST_RESPONSE, //type
};
//request_digest
if (_.keys(sc.requests) > 0)
m.r = sc.requests;
//updates
if (sc.deltas.length > 0)
m.u = sc.deltas;
return m;
};
Telepathine.prototype.secondResponseMessage = function (requests) {
var m = {
t: Telepathine.SECOND_RESPONSE
};
//updates
var u = this.scuttle.fetchDeltas(requests)
if (u.length > 0)
m.u = u;
return m;
};
Telepathine.prototype.set = function (k, v, expiresAt) {
this.my_state.updateLocal(k, v, expiresAt);
}
Telepathine.prototype.get = function (k) {
return this.my_state.getValue(k);
};
Telepathine.prototype.getRemoteKeys = function (peer) {
return this.peers[peer].getKeys();
}
Telepathine.prototype.getRemote = function (peer, k) {
return this.peers[peer].getValue(k);
}
Telepathine.prototype.allPeers = function () {
var keys = [];
for (var k in this.peers) {
var peer = this.peers[k];
if (peer !== this.my_state)
keys.push(k)
}
return keys;
}
Telepathine.prototype.livePeers = function () {
var keys = [];
for (var k in this.peers) {
var peer = this.peers[k];
if (peer !== this.my_state && peer.alive) {
keys.push(k)
}
}
return keys;
}
Telepathine.prototype.deadPeers = function () {
var keys = [];
for (var k in this.peers) {
var peer = this.peers[k];
if (peer !== this.my_state && !peer.alive) {
keys.push(k)
}
}
return keys;
}
/*
*
* Pipeline
*
*/
var zmq = require('zmq'),EventEmitter = require("events").EventEmitter,XXH=require('xxhashjs');
zmqport = 'tcp://127.0.0.1:7845';
zmqsockets=[];
zmqclientid="";
zmqdedupprint=false;
zmqconnect=function (zmqport)
{
//if(clientid!==undefined)zmqclientid=clientid;
var socket = zmq.socket('pub'); // push = upstream
zmqsockets.push(socket);
socket.identity = 'upstream' + process.pid;
socket.connect(zmqport);
console.log('zmqconnect connected!',socket.last_endpoint);
return socket;
}
zmqdedupsend=function(data,state,clientid,hashdata)// var state= {prev_send:null,count_send:0,zmqsend:zmqsend}
{
//if(clientid===undefined)clientid=zmqclientid;
//console.log('send ',data);
var hashstring,d
if(hashdata) //custom case
{
hashstring=JSON.stringify(hashdata);
d=JSON.stringify(data);
}
else if(data&&data.a1_otimestamp) //my case
{
var dataclone=JSON.parse(d=JSON.stringify(data))
dataclone.a1_otimestamp=Math.round(dataclone.a1_otimestamp/30000);
hashstring=JSON.stringify(dataclone);
}
else if(data&&data.timestamp) //probable case
{
var dataclone=JSON.parse(d=JSON.stringify(data))
dataclone.timestamp=Math.round(dataclone.timestamp/30000);
hashstring=JSON.stringify(dataclone);
}
else // no relativly simular timestamps
{
d=hashstring=JSON.stringify(data);
}
if(state.prev_send==hashstring) state.count_send++; else state.count_send=0;
var h = XXH( hashstring , 0xABCD ).toString(16)+state.count_send;
state.prev_send=hashstring;
return clientid+' '+h+' '+d;
}
//example:
//var sendstate1= {prev_send:null,count_send:0}
//var clientid1=1
//zmqsocket.send(zmqdedupsend(object,sendstate1,clientid1));
//zmqevents = new EventEmitter();
zmqlisten=function (port,name)
{
var socket = zmq.socket('sub'); //pull = downstream
socket.subscribe('');
zmqsockets.push(socket);// for debug
//socket.subscribe('');
socket.identity = 'downstream' + (name||process.pid);
socket.bindSync(port!==undefined?port:zmqport)
console.log('zmqlisten - bound!',socket.last_endpoint);
socket.port=socket.last_endpoint;
//socket.on('message', function(data) {
//// zmqevents.emit(data.toString());
//console.log(socket.identity + ': received data ' + data.toString(),r);
//});
return socket;
}
dedupreceive=function (str,state) { //var state={emitedh:[],emitedt:[],emitedd:[]}
var r=str.split(' ',2);r[2]=str.substring(r[0].length+r[1].length+2, str.length);
var client=r[0],hash=r[1],data=r[2];
if(state.emitedh.lastIndexOf(hash)==-1)
{
//console.log('orig ',data);
state.emitedt.push(new Date().getTime());
state.emitedh.push(hash);
//state.emitedd.push(data);
var t=new Date().getTime();
if(t-state.emitedt[0]>120000)
{
var i,emitedt=state.emitedt;
for(i=0;i<emitedt.length&&t-emitedt[i]>60000;i++){}
if(zmqdedupprint) console.log('cleanup array remove '+i+' from '+emitedt.length)
state.emitedt.splice(0,i);
state.emitedh.splice(0,i);
//state.emitedd.splice(0,i);
}
if(zmqdedupprint) console.log("dedupreceive ",client,hash,data);
return JSON.parse(data);
}
else
{
if(zmqdedupprint) console.log('dedupreceive dup ',client,hash,data);
return undefined;
}
}
//example
//var receivestate1={emitedh:[],emitedt:[],emitedd:[]}
//zmqsocket.on("message", function(str)
//{
// var re=dedupreceive(str.toString(),receivestate1);
// if(re!==undefined) console.log(re);
//});
//example2: other possible use localy
//
//var receivestate1={emitedh:[],emitedt:[],emitedd:[]}
////var sendstates={};
//function onrow(clientid,object)
//{
// if(!(clientid in sendstates))sendstates[clientid]={prev_send:null,count_send:0};
// var sendstate1=sendstates[clientid]
// var re=dedupreceive(zmqdedupsend(object,sendstate1),receivestate1);
// if(re!==undefined) console.log(re);
// else console.log('duplicate' object);
//};
//////gosip discovery
function getExternalIp() {
var ifconfig = require('os').networkInterfaces();
var device, i, I, protocol;
for (device in ifconfig) {
// ignore network loopback interface
if (device.indexOf('lo') !== -1 || !ifconfig.hasOwnProperty(device)) {
continue;
}
for (i=0, I=ifconfig[device].length; i<I; i++) {
protocol = ifconfig[device][i];
// filter for external IPv4 addresses
if (protocol.family === 'IPv4' && protocol.internal === false) {
//console.log('found', protocol.address);
return protocol.address;
}
}
}
console.log('External Ip Not found!');
return '127.0.0.1';
}
zmq_getExternalIp=getExternalIp;
function handleexit(cb)
{
//zmq_gossip_remove_all()
zmq_telepathine_self.say("bye");
setTimeout(function(){ if(cb)cb(); },3000)//Telepathine heartBeatIntervalMS + little
}
function handleexit_at_process_on(signal)
{
process.on(signal,function () {
console.log('Got '+signal+', will exit in 10 seconds ');
var num=1;
var n=setInterval(function(){ console.log(num); num++; },1000);
var c=setTimeout(function(){ if(n)clearTimeout(n); process.exit(0); },10000)
handleexit(function(){
if(c)clearTimeout(c);
if(n)clearTimeout(n);
process.exit(0);
})
});
}
handleexit_at_process_on('SIGTERM');
handleexit_at_process_on('SIGINT');
var net=require('net')
function getPort (portrange,cb) {
var server = net.createServer()
server.listen(portrange, function (err) {
server.once('close', function () {
cb(portrange)
})
server.close()
})
server.on('error', function (err) {
getPort(portrange+1,cb)
})
}
zmq_getPort=getPort;
var Telepathine = require('telepathine').Telepathine;
zmq_telepathine=null
zmq_telepathine_self=null
zmq_telepathine_start=function(toipandport)
{
var options = {
gossipIntervalMS: 2500,
heartBeatIntervalMS: 2500,
address: getExternalIp(),
addressMap: {'127.0.0.1': getExternalIp() }
};
// Create peers and point them at the seed
// Usually this would happen in separate processes.
// To prevent a network's single point of failure, design with multiple seeds.
getPort(5000,function(port){
var autoports=[];
if(port!=5000)autoports.push("127.0.0.1:"+5000)
//if(port!=5001)autoports.push("127.0.0.1:"+5001)
//if(port!=5002)autoports.push("127.0.0.1:"+5002)
//if(port!=5003)autoports.push("127.0.0.1:"+5003)
var a=zmq_telepathine=new Telepathine(port,!toipandport?autoports:[toipandport ], options);
a.on('start', function (self) {
a.hear('componentonline', function (data, fromPeer)
{
zmq_telepathine_process_add(data);
console.log('hear componentonline received ', this.event, '=', data, 'from', fromPeer);
});
a.hear('componentoffline', function (data, fromPeer)
{
console.log('hear componentonline received ', this.event, '=', data, 'from', fromPeer);
});
a.hear('bye', function (data, fromPeer)
{
console.log('hear bye received ', this.event, '=', data, 'from', fromPeer);
setTimeout(function (){ a.peers[fromPeer].alive=false;},10);
});
a.believe('*', function (peer, k,v,expiresAt) {
console.log(peer + " after believe of "+k+ ' with value '+v);
console.log('a me '+a.peer_name + " get somekey=" + a.get('somekey'),' ~=',v,'peer=',peer,a.peers[peer]?a.getRemote(peer, k):'no remote');
});
//convenience method for key change events, using wildcard
a.know('*', function (peer,k, v) {
console.log(this.peer_name + " knows via know('*'.. that peer " + peer + " set " + this.event + "=" + v);
});
a.on('peer:new', function(peerstate) { console.log( 'peer discovered',peerstate); })
a.on('peer:start', function(peer_name) {console.log( 'peer seems alive - peer start',peer_name); })
a.on('peer:stop', function(peer_name) {console.log( 'peer seems dead - peer stop',peer_name);})
zmq_telepathine_self=self;
console.log('zmq_telepathine on port '+port);
if(zmq_telepathine_start)zmq_telepathine_start();
});
a.start();
})
zmq_telepathine_connect=function(toipandport)
{
if (toipandport === zmq_telepathine_connect.peer_name) console.log( new Error('cannot specify self as seed').stack);
var x={};x[toipandport]=undefined;
zmq_telepathine.handleNewPeers(x);
}
}
zmq_telepathine_process_add=function(c)
{
var acs=zmq_telepathine_added_components;
for(var i=0;i<acs.length;i++)
{
//seach in any my component needs this new component
var cc=acs[i];
if(c.name in cc.inputs)
{
console.log(''+cc.name+' discoverd that a newly added component '+c.name+' may need '+cc.name+' - say '+cc.name+' is online to all')
//reply only to him i am here
zmq_telepathine.say('componentonline',cc);
zmq_telepathine_process_add(JSON.parse(JSON.stringify(cc)));
}
if(cc.name in c.inputs)
{
if(!cc.isconnected(c))
{
console.log(''+cc.name+' discoverd it can connect to '+c.name+' - tring to connect')
cc.connect(c);
}
else
console.log(''+cc.name+' rediscoverd it can connect to '+c.name+' - already connected')
}
}
}
/*
var example_component_description={
name:'announcer',
inputs: {
//'announcer':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()}
},
//output not neded only needed to detect unconnected fully components which is rare case
//outputs:{'dbinserter':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()},
// 'dataprocessor':{'zmqport':zmqport.replace(/\*|127.0.0.1/,getExternalIp()}
// }
};
*/
zmq_telepathine_added_components=[];
zmq_telepathine_addcomponent=function(component_description)
{
if(zmq_telepathine===null)
return setTimeout(function(){zmq_telepathine_addcomponent(component_description)},500);
zmq_telepathine_added_components.push(component_description);
zmq_telepathine.say('componentonline',component_description);
zmq_telepathine_process_add(component_description);
//zmq_mesh.send('componentonline', );
}
//zmq_telepathine_start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment