Skip to content

Instantly share code, notes, and snippets.

@alexstrat
Created May 23, 2012 15:03
Show Gist options
  • Select an option

  • Save alexstrat/2775744 to your computer and use it in GitHub Desktop.

Select an option

Save alexstrat/2775744 to your computer and use it in GitHub Desktop.
var StateEventEmitter = require('./util/state-eventemitter'),
Deferred = require('./util/deferred'),
Crypto = require('./util/crypto'),
PeerArray = require('./util/peerarray'),
XORSortedPeerArray = require('./util/xorsorted-peerarray'),
IterativeDeferred = require('./util/iterative-deferred'),
globals = require('./globals.js'),
RoutingTable = require('./dht/routing-table'),
Peer = require('./dht/peer'),
BootstrapPeer = require('./dht/bootstrap-peer'),
Reactor = require('./network/reactor'),
PingRPC = require('./network/rpc/ping'),
FindNodeRPC = require('./network/rpc/findnode'),
FindValueRPC = require('./network/rpc/findvalue'),
StoreRPC = require('./network/rpc/store'),
ValueManagement = require('./data/value-store');
var Node = module.exports = Peer.extend({
/**
* TODO : explicit the options..
*
*
* @param {[type]} id [description]
* @param {[type]} options [description]
* @return {[type]}
*/
initialize: function(id, options) {
// extends Peer
this.supr('non-defined', id || Crypto.digest.randomSHA1());
//implements StateEventEmitter
for (var fn in StateEventEmitter.prototype) {
if (fn !== 'initialize') this[fn] = StateEventEmitter.prototype[fn];
}
StateEventEmitter.prototype.initialize.call(this);
this.setState('initializing');
// store config
var config = this.config = {};
for (var option in options) {
config[option] = options[option];
}
// extracts bootstraps from the config object
if (!Array.isArray(config.bootstraps) || config.bootstraps.length === 0) {
throw new Error('no bootstrap to join the network');
} else {
this._bootstraps = config.bootstraps.map(function(address) {
return new BootstrapPeer(address);
});
}
// instantiate a routing table and listen to it
this._routingTable = new RoutingTable(this, config.routing_table);
this._routingTable.on(this.routingTableEvents, this);
// instantiate a reactor and listen to it
this._reactor = new Reactor(this, config.reactor);
this._reactor.on(this.reactorEvents, this);
this.setState('initialized');
},
/**
* Connect method : make the reactor connect.
* @public
*
* @param {Function} [callback] - callback to be called when connected
* @param {Object} [context = this] - context of the callback
* @return {self}
*/
connect: function(callback, context) {
if (this.stateIsNot('connected')) {
if (callback) {
this.once('connected', callback, context || this);
}
this._reactor.connectTransport();
}
return this;
},
/**
* Disconnect method : make the reactor disconnect.
* @public
*
* @param {Function} [callback] - callback to be called when connected
* @param {Object} [context = this] - context of the callback
* @return {self}
*/
disconnect: function(callback, context) {
if (this.stateIsNot('disconnected')) {
if (callback) {
this.once('disconnected', callback, context || this);
}
this._routingTable.stop();
this._reactor.disconnectTransport();
}
return this;
},
/**
* Joining process : do an iterative find node on our own ID,
* startying by contacting the peers passed as bootstraps.
* @public
*
* TODO : expose a public API.
*
* @param {Function} callback - called when bootstraps process ends
* @param {Object} context - context of the callback
*
* @return {self} this
*/
join: function(callback, context) {
// lookup process
var startLookup = function() {
this.emit('joining');
return this.iterativeFindNode(this);
};
var noBootstrap = function() {
return new Error('no bootstrap');
};
// joining result
var success = function() {
this.emit('joined');
};
var failure = function() {
this.emit('join failed');
};
//ping the bootstraps
var pings = this._bootstraps.map(function(peer) {
return new PingRPC(peer);
});
this._reactor.sendRPC(pings);
context = context || this;
Deferred.whenAtLeast(pings)
.pipe(startLookup, noBootstrap, this)
.then(success, failure, this)
.then(callback, callback, context);
return this;
},
/**
* Get a value on the DHT providing the associated key.
*
* @public
* Public wrapper around #iterativeFindValue.
*
* Provided callback will be called with the found value
* or with null if not found.
*
* @param {String} key - key to find
* @param {Function} callback - function to be called at end
* @param {Object} [context] - context of callback
* @return {self}
*/
get: function(key, callback, context) {
context = context || this;
this.iterativeFindValue(key).then(
function(kv) {
callback.call(context, kv.value);
}, function() {
callback.call(context, null);
});
return this;
},
/**
* Put a given value on the DHT associated to the given key and
* with the given expiration time.
*
* @public
* Public wrapper around #iterativeStore.
*
* If the given key is `null` the associated key is set to the SHA1 of
* the value. Expiration time is not mandatory and set to infinite
* by default.
*
* An optional callback (with a context) can be provided and will be
* called with :
* - 1st parameter : key associated to the value on the DHT
* - 2nd parameter : number of peers that successfully stored the
* value. If 0, the process has failed.
*
* @param {String || null} key - key or null if value by default SHA1(value)
* @param {*} value - value to store on the DHT
* @param {Date || Number} [key] - date of expiration of the key/value
* @param {Function} [callback] - callback when the store process ends
* @param {Object} [context] - context of callback
* @return {self}
*/
put: function(key, value, exp, callback, context) {
// if no exp, arguments sliding
if (typeof exp == 'function') {
exp = undefined;
callback = exp;
context = callback;
}
// default values
key = key || Crypto.digest.SHA1(String(value));
exp = exp || -1;
context = context || this;
this.iterativeStore(key, value, exp)
.then(function(key, peers) {
if (callback) callback.call(context, key, peers.size());
}, function() {
if (callback) callback.call(context, null, 0);
});
return this;
},
//# INTERNAL EVENTS HANDLING
/**
* Reactions to events coming from the reactor.
* @type {Object}
*/
reactorEvents : {
/**
* On `connected` event :
* - save our address on the network
* - state is `connected`
*
* @param {String} address - Address of the node on the network
*/
connected: function(address) {
this.setAddress(address);
if (typeof this._store == 'undefined') {
this._store = new ValueManagement(this, this.config.value_management);
this._store.on(this.VMEvents, this);
}
this.setState('connected');
},
/**
* On `disconnected`, state becomes `disconnected`.
*/
disconnected: function() {
this.setState('disconnected');
},
/**
* On `reached` peer, add it to routing table.
*
* @param {Peer} peer - Peer that had been reached.
*/
reached: function(peer) {
peer.touch();
this._routingTable.addPeer(peer);
},
/**
* On `queried` (means we received a RPC request), call
* the appopriate method to fullfill the RPC.
* @see #handle+`method_name`
*
* @param {RPC} rpc - The received rpc
*/
queried: function(rpc) {
if (!rpc.inProgress())
return;
this['handle' + rpc.getMethod()].call(this, rpc);
},
/**
* On `outdated` (means we received a response from a peer
* which ID seems to be different from the one in the routing table),
* update the ID in the routing table.
*
* @param {Peer} peer - outdated peer
* @param {String} id - new id
*/
outdated: function(peer, id) {
this._routingTable.removePeer(peer);
peer.setID(id);
this._routingTable.addPeer(peer);
}
},
/**
* Handle an incoming PING RPC request :
* simply respond to it.
*
* @param {PingRPC} rpc - the incoming RPC object
*/
handlePING: function(rpc) {
rpc.resolve();
},
/**
* Handle an incoming FIND_NODE RPC request :
* fetch from the routing table the BETA closest
* peers (except the querying peer) to the
* targeted ID and respond to the rpc.
*
* @param {FindNodeRPC} rpc - the inconming rpc object
*/
handleFIND_NODE: function(rpc) {
rpc.resolve(this._routingTable.getClosePeers(rpc.getTarget(), globals.BETA, rpc.getQuerying()));
},
/**
* Handle an incoming FIND_VALUE request:
* - if we got the value, respond it.
* - if not, fetch the BETA closest peer, respond them.
*
* @param {FindValueRPC} rpc - the incoming rpc oject
*/
handleFIND_VALUE: function(rpc) {
this._store.retrieve(rpc.getTarget())
.then(function(value, exp) {
rpc.resolve({value : value, exp : exp}, true);
}, function() {
rpc.resolve(this._routingTable.getClosePeers(rpc.getTarget(), globals.BETA, rpc.getQuerying()), false);
}, this);
},
/**
* Handle an incoming STORE request :
* store the value in ValueManagement and respond.
*
* @param {StoreRPC} rpc - the incoming rpc object.
*/
handleSTORE: function(rpc) {
this._store.save(rpc.getKey(), rpc.getValue(), rpc.getExpiration())
.then(rpc.resolve, rpc.reject, rpc);
},
/**
* Reactions to events comming from the routing table.
* @type {Object}
*/
routingTableEvents : {
/**
* On `refresh` (means a kbucket has not seen any fresh
* peers for a REFRESH_TIMEOUT time), do an titerative find node
* on a random ID in the KBucket range.
*
* @param {KBucket} kbucket - Kbucket needing to be refreshed
*/
refresh: function(kbucket) {
var random_sha = Crypto.digest.randomSHA1(this.getID(), kbucket.getRange());
this.iterativeFindNode(random_sha);
}
},
/**
* Reactions to event coming from the value management.
* @type {Object}
*/
VMEvents : {
/**
* On `republish` (means a key-value needs to be republished
* on the network) : do an iterative store on it.
*
* @param {String} key - key
* @param {Object} value - value
* @param {Date | Number} exp - expiration date
*/
republish: function(key, value, exp) {
this.iterativeStore(key, value, exp);
}
},
//# ITERATIVE PROCESSES
/**
* Launch an iterative find node process.
*
* Return a deffered object :
* - resolve :
* {Peer} peer - peer found that have the targeted ID
* {PeerArray} peers - reached peers during the iterative process
* - reject :
* {PeerArray} peers - reached peers during the iterative process
*
* @param {Peer | String} peer - Peer or Node ID to find.
* @return {Deferred}
*/
iterativeFindNode: function(target) {
target = (target instanceof Peer) ? target.getID() : target;
var send = this.send(),
close = this._routingTable.getClosePeers(target, globals.K),
init = new XORSortedPeerArray(close, target),
lookup = new IterativeDeferred(init),
staled = false;
function map(peer) {
var rpc = new FindNodeRPC(peer, target);
send(rpc);
return rpc;
}
function reduce(peers, newPeers, map) {
peers.add(newPeers);
if (peers.newClosestIndex() >= 0 && peers.newClosestIndex() < globals.ALPHA) {
peers.first(globals.ALPHA, map);
}
return peers;
}
function end(peers, map, reached) {
if (staled) {
lookup.reject(new XORSortedPeerArray(reached, target));
return;
}
if (reached.length <= globals.ALPHA && peers.size() > 0) {
staled = true;
peers.first(globals.K, map);
} else {
lookup.resolve(new XORSortedPeerArray(reached, target));
}
}
// -- UI HACK
lookup._target = target;
this.emit('iterativeFindNode', lookup, close);
return lookup
.map(map)
.reduce(reduce, init)
.end(end);
},
/**
* Launch an iterative find value process.
*
* If succeed, it STOREs the value to the
* closest reached peer which didn't responded
* the value.
*
* Return a deffered object :
* - resolve :
* {Object} keyValue - properties :
* `value` - the retrieved value
* `exp` - the expiration date
* {PeerArray} peers - reached peers during the iterative process
* - reject
* {PeerArray} peers - reached peers during the iterative process
*
* @param {String} key - targeted ID
* @return {Deferred}
*/
iterativeFindValue: function(key) {
if (!globals.REGEX_NODE_ID.test(key)) {
throw new TypeError('non valid key');
}
var send = this.send(),
close = this._routingTable.getClosePeers(key, globals.K),
init = new XORSortedPeerArray(close, key),
lookup = new IterativeDeferred(init),
staled = false;
function map(peer) {
var rpc = new FindValueRPC(peer, key);
send(rpc);
return rpc;
}
function reduce(peers, result, found, map, queried, reached) {
if (found) {
var index = (peers.newClosestIndex() > 0) ? 0 : 1;
var rpc = new StoreRPC(peers.getPeer(index), key, result.value, result.exp);
send(rpc);
lookup.resolve(result, new XORSortedPeerArray(reached, key));
} else {
peers.add(result);
if(peers.newClosestIndex() >= 0 && peers.newClosestIndex() < globals.ALPHA) {
peers.first(globals.ALPHA, map);
}
}
return peers;
}
function end(peers, map, reached) {
lookup.reject(new XORSortedPeerArray(reached, key));
}
// -- UI HACK
lookup._target = key;
this.emit('iterativeFindValue', lookup, close);
return lookup
.map(map)
.reduce(reduce, init)
.end(end);
},
/**
* Launch an iterative store process :
* do an iterative find node on the key,
* and send STORE RPCsto the K closest
* reached peers.
*
* Return a deferred object that is resolved when
* at least one of the store RPC is resolved :
* - resolve :
* {String} key - key with wihich the value was stored
* {PeerArray} peers - peers that resolved the store RPC
* {PeerArray} peers_not - peers that did not resolved the store RPC
* - reject
* {PeerArray} peers_not - peers that did not resolved the store RPC
*
* @param {String} key - key
* @param {*} value - value to store on the network
* @param {Date | Integer} [exp = never] - experiation date of the value
* @return {Deferred}
*/
iterativeStore: function(key, value, exp) {
if (!globals.REGEX_NODE_ID.test(key)) {
throw new TypeError('non valid key');
}
function querieds(rpcs) {
return new PeerArray(rpcs.map(function(rpc) {
return rpc.getQueried();
}));
}
var def = new Deferred(),
send = this.send();
var stores = function(peers) {
var targets = peers.first(globals.K);
var rpcs = targets.map(function(peer) {
return send(new StoreRPC(peer, key, value, exp));
});
Deferred.whenAtLeast(rpcs, 1)
.then(function(stored, notStored) {
def.resolve(key, querieds(stored), querieds(notStored));
}, function(stored, notStored) {
def.reject(querieds(notStored));
});
};
this.iterativeFindNode(key)
.then(stores, function() { def.reject(new PeerArray()); });
return def;
},
/**
* Closure to proxy the reactor send method
*/
send: function() {
var reactor = this._reactor;
return function() {
return reactor.sendRPC.apply(reactor, arguments);
};
}
});
var StateEventEmitter = require('../util/state-eventemitter'),
globals = require('../globals'),
protocol = require('./protocol'),
Transport = require('./transport'),
PingRPC = require('./rpc/ping'),
FindNodeRPC = require('./rpc/findnode'),
FindValueRPC = require('./rpc/findvalue'),
StoreRPC = require('./rpc/store'),
RPC = require('./rpc/rpc'),
log = require('../logging').ns('Reactor');
var Reactor = module.exports = StateEventEmitter.extend({
/**
* TODO : explicit the options
*
* @param {Node} node - the Node Instance to which this reactor is associated
* @param {Object} options - options
*/
initialize: function(node, options) {
this.supr();
this._node = node;
// load config
var config = this.config = {
protocol : globals.PROTOCOL,
cleanup : globals.CLEANUP_INTERVAL,
adaptiveTimeout : globals.ADAPTIVE_TIMEOUT_INTERVAL
};
for (var option in options) {
this.config[option] = options[option];
}
if (!protocol.hasOwnProperty(config.protocol)) throw new Error('non defined protocol');
// instantiate the transport and protocol
this._protocol = protocol[config.protocol];
this._transport = new Transport(
config.host,
config.transport
);
// request table and ragular clean up the table
this._requests = {};
this._startCleanup();
this._rtts = [];
if (!!config.adaptiveTimeout) {
this._startAdaptiveTimeout();
}
// associate RPC object to RPC methods
this.RPCObject = {
PING : PingRPC.extend( {reactor : this}),
FIND_NODE : FindNodeRPC.extend( {reactor : this}),
FIND_VALUE : FindValueRPC.extend({reactor : this}),
STORE : StoreRPC.extend( {reactor : this}),
__default : undefined
};
this.setState('disconnected');
},
/**
* Stop the reactor:
* stop clean-up process and disconnect transport.
*/
stop: function() {
this._stopCleanup();
this._stopAdaptiveTimeout();
this.disconnectTransport();
},
/**
* Return the node instance (also a Peer instance).
* @return {Object} node instance
*/
getMeAsPeer: function() {
return this._node;
},
/**
* Connect the transport.
*/
connectTransport: function() {
if (this._transport.stateIsNot('connected')) {
this._transport.once('connected', function(address) {
// main listen loop
this._transport.listen(this.handleRPCMessage, this);
this._startCleanup();
this.setState('connected', address);
}, this);
this._transport.connect();
}
return this;
},
/**
* Disconnect the transport.
* @return {[type]}
*/
disconnectTransport: function() {
if (this._transport.stateIsNot('disconnected')) {
this._transport.once('disconnected', function() {
this.setState('disconnected');
}, this);
this._transport.disconnect();
}
return this;
},
/**
* Send a RPC query : add it to the requests table and pass it
* to #sendNormalizedQuery.
*
* @param {RPC} rpc - rpc to send
*/
sendRPCQuery: function(rpc) {
if (this.stateIsNot('connected')) {
rpc.reject('transport not connected');
log.error('send query : transport disconnected', rpc);
}
else {
this._storeRequest(rpc);
this.sendNormalizedQuery(rpc.normalizeQuery(), rpc.getQueried(), rpc);
log.debug('Reactor', 'send query', rpc.getMethod(), rpc.getQueried().getAddress(), rpc.normalizeQuery());
}
this.emit('querying', rpc);
return this;
},
/**
* Encode a normalised query whith the appropriate protcol,
* and send it.
*
* @param {Object} query - normalized query
* @param {Peer} dst_peer - destination peer
*/
sendNormalizedQuery: function(query, dst_peer) {
var req = this._protocol.buildRequest(query.method, query.params);
req.setRPCID(query.id);
this._transport.send(dst_peer.getAddress(), req);
},
/**
* Send a RPC response.
* @param {RPC} rpc - RPC object to send.
*/
sendRPCResponse: function(rpc) {
if (this.stateIsNot('connected')) {
rpc.reject('transport not connected');
log.error('send response : transport disconnected', rpc);
} else {
this.sendNormalizedResponse(rpc.normalizeResponse(), rpc.getQuerying(), rpc);
log.debug('send response', rpc.getMethod(), rpc.getQuerying().getAddress(), rpc.normalizeResponse());
}
return this;
},
/**
* Encode a normalised query whith the appropriate protcol,
* and send it.
*
* @param {Object} response - normalized query
* @param {Peer} dst_peer - destination peer
*/
sendNormalizedResponse: function(response, dst_peer) {
var prot = this._protocol,
res = (response.hasOwnProperty('result')) ?
prot.buildResponse(response.result, response.id) :
prot.buildErrorResponse(prot.buildInternalRPCError(response.error), response.id);
this._transport.send(dst_peer.getAddress(), res);
},
/**
* Handle an incoming encoded RPC message :
* normalyse the message and pass it to the right handler.
*
* @param {Object} data - raw data
*/
handleRPCMessage: function(data) {
var message;
try {
message = this._protocol.parseRPCMessage(data.msg);
}
catch(RPCError) {
log.warn('received a broken RPC message', RPCError);
return;
}
if (message.isRequest()) {
this.handleNormalizedQuery({
id : message.getRPCID(),
method : message.getMethod(),
params : message.getParams()
}, data.src);
} else if (message.isResponse()) {
this.handleNormalizedResponse({
id : message.getRPCID(),
result : message.getResult()
}, data.src);
} else if (message.isError()) {
this.handleNormalizedResponse({
id : message.getRPCID(),
error : message.getError()
}, data.src);
}
},
/**
* Handle a normalized query : construct the associated RPC object,
* and emit `queried` wiht the object. Bind the resolve or reject for
* sending the response.
*
* @param {Object} query - normalized query
* @param {String} from - address of the querying peer
*/
handleNormalizedQuery: function(query, from) {
var method = (this.RPCObject.hasOwnProperty(query.method)) ? query.method : '__default';
if (!this.RPCObject[method]) {
log.warn( 'receive query with method "' + query.method + '" not available');
return;
}
//crate the appropirate RPC object
var rpc = new this.RPCObject[method]();
rpc.handleNormalizedQuery(query, from);
//when resolved or rejected, send response
rpc.always(rpc.sendResponse);
//handler could have rejected the query
if (!rpc.isRejected()) {
this.emit('reached', rpc.getQuerying());
log.debug( 'received query', rpc.getMethod(), from, query);
this.emit('queried', rpc);
}
},
/**
* Handle a normalized response : find the associated RPC
* object (correspond to the rpc id) and pass to it.
* @param {Object} response - normalized response
* @param {String} from - address of the peer that responded
*/
handleNormalizedResponse: function(response, from) {
var rpc = this._getRequestByID(response.id);
if (!rpc) {
log.warn( 'response matches no request', from, response);
} else {
log.debug( 'received response', rpc.getMethod(), from, response);
rpc.handleNormalizedResponse(response, from);
}
return this;
},
/**
* Find a request in the requests table given its rpc id.
* @param {String} id - rpc id
*/
_getRequestByID: function(id) {
return this._requests[id];
},
/**
* Store the request in he table.
* @param {RPC} rpc - rpc to store
*/
_storeRequest: function(rpc) {
this._requests[rpc.getID()] = rpc;
},
/**
* Periodicly remove the stored requests already completed.
*/
_startCleanup: function() {
this._cleanupProcess = setInterval(function(self) {
var requests = self._requests;
for (var id in requests) {
if (requests.hasOwnProperty(id)) {
if (requests[id].isCompleted())
delete requests[id];
}
}
}, this.config.cleanup, this);
},
/**
* Stop the periodic cleanup.
*/
_stopCleanup: function() {
clearInterval(this._cleanupProcess);
},
//helpers :
/**
* kethod to send a rpc.
*
* @param {RPC | Array<RPC>} rpc - rpc to send
*/
sendRPC: function(rpc) {
//an array of RPCs
if(Array.isArray(rpc)) {
for(var i = 0; i < rpc.length; i++) {
this.sendRPC(rpc[i]);
}
return this;
}
//pass instace of reactor
rpc.reactor = this;
rpc.setQuerying(this.getMeAsPeer());
var success = function() {
// emit the reach as the first event
this.emit('reached', rpc.getQueried());
};
var failure = function(type) {
if (type === 'outdated') {
// forward outdated events
this.emit.apply(this, arguments);
}
};
rpc.then(success, failure, this);
return rpc.sendQuery();
},
//
// Statistics
//
timeoutValue: globals.TIMEOUT_RPC,
distributionSize: 3000,
faultTolerance: 0.75,
_minTimeout: 1000,
_maxTimeout: 10 * 1000,
addRTT: function(rtt) {
this._rtts.push(rtt);
},
_startAdaptiveTimeout: function() {
var self = this;
this._adaptiveTimeoutID = setInterval(function() {
self._adaptiveTimeout();
}, this.config.adaptiveTimeout);
},
_stopAdaptiveTimeout: function() {
clearInterval(this._adaptiveTimeoutID);
},
/**
* Implements the algorithm to compute a
* long-term-adaptive-timeout value
*/
_adaptiveTimeout: function() {
if (this._rtts.length > this.distributionSize) {
this._rtts = this._rtts.slice(this._rtts.length - this.distributionSize);
}
this.timeoutValue = this._faultToleranceAdaptiveTimeout();
},
_faultToleranceAdaptiveTimeout: function() {
var buff = this._rtts.slice().sort(function(a, b) { return a - b; });
var i = Math.round(buff.length * this.faultTolerance) - 1;
if (i < buff.length - 1) {
return Math.max(Math.min(buff[i], this._maxTimeout), this._minTimeout);
}
return globals.TIMEOUT_RPC;
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment