Skip to content

Instantly share code, notes, and snippets.

@totherik
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save totherik/f4d56ac4a83f11585e32 to your computer and use it in GitHub Desktop.
Save totherik/f4d56ac4a83f11585e32 to your computer and use it in GitHub Desktop.
Levee Emitter Wrapper
'use strict';
var Net = require('net');
var Promise = require('promise');
var Levee = require('levee');
// Experimental emitter wrapper for Levee circuits.
Levee.wrapEmitter = function wrapEmitter(factory, execute) {
return {
factory: factory,
execute: execute,
emitter: null,
promise: null,
run: function (context, cb) {
var self;
self = this;
self._create(function (err, ee) {
if (err) {
cb(err);
return;
}
self.execute.call(ee, context, cb);
});
},
_create: function (cb) {
var self = this;
if (self.emitter) {
cb(null, self.emitter);
return;
}
self.promise = self.promise || new Promise(function (resolve, reject) {
self.factory(function (err, ee) {
self.emitter = null;
self.promise = null;
if (err) {
reject(err);
return;
}
ee.on('error', function () {
self.emitter = null;
});
resolve(self.emitter = ee);
});
});
self.promise.done(function (ee) {
cb(null, ee);
}, cb);
}
};
};
// Socket test impl for emitter wrapper
var socket;
// All wrapped emitters are build via a factory. This allows reconstruction
// of things like sockets when unrecoverable errors occur.
function factory(cb) {
function onerror(err) {
console.log('disconnected');
socket.destroy();
socket = null;
setImmediate(function () {
cb(err);
});
}
if (socket && !socket.writable) {
console.log('reconnect');
socket.destroy();
socket = null;
factory(cb);
return;
}
if (!socket) {
console.log('create');
socket = new Net.Socket();
socket.once('error', onerror);
socket.connect(24224, 'localhost', function () {
console.log('connected');
this.removeListener('error', onerror);
cb(null, socket);
});
return;
}
console.log('connected');
setImmediate(function () {
cb(null, socket);
});
}
// The execute impl similar to existing Levee API. `this` in this context
// is the emitter itself.
function execute(context, cb) {
console.log('exec');
this.write(context, cb);
}
// Create our emitter wrapper.
var emitter = Levee.wrapEmitter(factory, execute);
var breaker = Levee.createBreaker(function (context, cb) {
emitter.run(context, cb);
}, { resetTimeout: 5000 });
// Kick off the test
(function run() {
breaker.run('foobar', function (err) {
console.log(err || 'ok');
setTimeout(run, 500);
});
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment