Skip to content

Instantly share code, notes, and snippets.

@estliberitas
Last active August 29, 2015 14:15
Show Gist options
  • Save estliberitas/d7183c5d017e7fce0356 to your computer and use it in GitHub Desktop.
Save estliberitas/d7183c5d017e7fce0356 to your computer and use it in GitHub Desktop.
Simple pool
'use strict';
module.exports = pool;
function pool(options) {
var instances = options.config.map(function() {
return null;
});
function noop() {
}
function acquireInstance(callback) {
var emptyIdx = indexOfEmpty();
var instance;
if (emptyIdx !== -1) {
initializeInstance(emptyIdx, markAcquired);
}
else if (instance = getIdle()) {
markAcquired(null, instance);
}
else {
instance = getLeastLoaded();
instance.queue.push(callback);
}
function markAcquired(err, instance) {
if (err) {
callback(err);
}
else {
instance.acquired = true;
callback(null, instance.object);
}
}
}
function getByObject(object) {
var i = 0;
var instance;
while (instance = instances[i++]) {
if (instance.object === object) {
return instance;
}
}
return null;
}
function getIdle() {
var i = 0;
var instance;
while (instance = instances[i++]) {
if (instance.acquired === false) {
return instance;
}
}
return null;
}
function getLeastLoaded() {
var i = 0;
var instance;
var queueLength;
var minQueueLength = instances[0].queue.length;
var minIdx = 0;
while (instance = instances[i]) {
queueLength = instance.queue.length;
if (queueLength < minQueueLength) {
minIdx = i;
minQueueLength = queueLength;
}
i++;
}
return instances[minIdx];
}
function indexOfEmpty() {
var i = 0;
var length = instances.length;
while (i < length) {
if (instances[i] === null) {
return i;
}
i++;
}
return -1;
}
function initializeInstance(idx, callback) {
var config = options.config[idx];
options.create(config, afterCreate);
function afterCreate(err, object) {
if (err) {
return callback(err);
}
var instance = {
acquired: false,
object: object,
queue: []
};
instances[idx] = instance;
callback(null, instance);
}
}
function releaseInstance(object, callback) {
callback = callback || noop;
var instance = getByObject(object);
if (!instance) {
return callback(new Error('Unknown object'));
}
else if (!instance.queue.length) {
instance.acquired = false;
callback();
}
else {
setImmediate(instance.queue.shift(), null, object);
callback();
}
}
function destroyInstance(object, callback) {
callback = callback || noop;
var instance = getByObject(object);
if (!instance) {
return callback(new Error('Unknown object'));
}
var idx = instances.indexOf(instance);
instances[idx] = null;
if (options.destroy) {
options.destroy(object);
}
var queue = instance.queue;
var cb;
if (queue.length) {
// balance remaining acquire requests
while (cb = queue.shift()) {
acquireInstance(cb);
}
}
}
return {
acquire: acquireInstance,
release: releaseInstance,
destroy: destroyInstance
}
}
/** Example usage */
var testPool = pool({
config: [
{
host: '127.0.0.1',
port: 9092
},
{
host: '127.0.0.1',
port: 9093
}
],
create: function(config, callback) {
callback(null, {
options: config,
send: function(data, callback) {
console.info('sending data to %s:%d: %s', config.host, config.port, data);
var delay = Math.round(1000 * Math.random());
setTimeout(callback, delay);
}
});
},
destroy: function(instance) {
// instance.close();
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment