Skip to content

Instantly share code, notes, and snippets.

@odf
Last active August 29, 2015 14:05
Show Gist options
  • Save odf/ff094c8c796883fba06b to your computer and use it in GitHub Desktop.
Save odf/ff094c8c796883fba06b to your computer and use it in GitHub Desktop.
Asynchronous Flux-like dispatcher with deadlock prevention via Go-style CSP
// Asynchronous Flux-like dispatcher with deadlock prevention via Go-style CSP.
//
// - my libraries ceci-core and ceci-channel are available from npm
// - pre-compile this file with regenerator
// - build with webpack or browserify
'use strict';
var ceci = require('ceci-core');
var chan = require('ceci-channels');
// clones an object, mapping values via the function fn
var mapObject = function(obj, fn) {
var result = {}, key;
for (key in obj)
if (obj.hasOwnProperty(key))
result[key] = fn(key, obj[key]);
return result;
};
// exception-handling variant for top-level go blocks
var detach = function(task) {
ceci.go(task).then(null, function(ex) {
console.error(ex + '\n' + ex.stack);
});
};
// creates and runs a dispatcher
var dispatcher = function() {
var queue = chan.chan(50); // FIFO queue for all calls to dispatcher
var clients = {}; // subscribed callbacks
// subscribes a client; id must be a unique number or string
var subscribe = function(id, handler) {
clients[id] = handler;
};
// unsubscribes a client
var unsubscribe = function(id) {
delete clients[id];
};
// dispatches a payload to all subscribed clients
var dispatch = function(payload) {
var done; // a channel for each subscriber
var dependency = {}; // who is currently waiting for whom
done = mapObject(clients, function() { return chan.chan(); });
// returns a task that waits for waitedFor to finish
var waitFor = function(waiting, waitedFor) {
if (!clients[waitedFor]) {
console.error('WARNING: waitFor(' + waitedFor + ') by ' + waiting
+ ' ignored (no such subscriber)');
return;
}
return ceci.go(function*() {
// deadlock detection
// CAUTION: this will not work as is in a multi-threaded system
var t = waitedFor;
while (dependency[t] !== undefined)
t = dependency[t];
// in case of deadlock, issue a warning and return
if (t == waiting) {
console.error('WARNING: waitFor(' + waitedFor + ') by ' + waiting
+ ' ignored (would create deadlock)');
return;
}
// register dependency, wait for completion, remove dependency
dependency[waiting] = waitedFor
yield chan.pull(done[waitedFor]);
delete dependency[waiting];
});
};
// create an asynchronous task for each subscriber
var tasks = Object.keys(clients).map(function(id) {
return ceci.go(function*() {
// run callback for subscriber id and wait for completion
yield clients[id](payload, function(other) {
return waitFor(id, other);
})
// signal completion of subscriber id
// (all pulls from a closed channel return immediately)
chan.close(done[id]);
});
});
// return a compound task that waits for all these tasks to finish
return ceci.join(tasks);
};
// main dispatcher loop: reads and fullfills requests from the queue
detach(function*() {
var task;
while (undefined != (task = yield chan.pull(queue))) {
switch (task.action) {
case 'subscribe':
subscribe(task.id, task.handler);
break;
case 'unsubscribe':
unsubscribe(task.id);
break;
case 'dispatch':
yield dispatch(task.payload);
break
}
}
});
// the methods of the returned object simply queue requests
return {
subscribe: function(id, handler) {
return chan.push(queue, {
action : 'subscribe',
id : id,
handler: handler
});
},
unsubscribe: function(id) {
return chan.push(queue, {
action: 'unsubscribe',
id : id
});
},
dispatch: function(payload) {
return chan.push(queue, {
action : 'dispatch',
payload: payload
});
}
};
};
// a quick test function for the dispatcher functionality
var test = function() {
detach(function*() {
var d = dispatcher();
var log = function(subject, predicate, object) {
console.log([subject, predicate, JSON.stringify(object)].join(' '));
};
yield d.subscribe(1, function(payload, waitFor) {
var me = 'Subscriber 1';
return ceci.go(function*() {
log(me, 'received', payload);
if (payload == 'Hello!') {
log(me, 'waiting for', 2);
yield waitFor(2);
log(me, 'resuming on', payload);
}
log(me, 'done with', payload);
});
});
yield d.subscribe(2, function(payload, waitFor) {
var me = 'Subscriber 2';
return ceci.go(function*() {
log(me, 'received', payload);
log(me, 'waiting for', 3);
yield waitFor(3);
log(me, 'resuming on', payload);
log(me, 'done with', payload);
});
});
yield d.subscribe(3, function(payload, waitFor) {
var me = 'Subscriber 3';
return ceci.go(function*() {
log(me, 'received', payload);
log(me, 'waiting for', 1);
yield waitFor(1);
log(me, 'resuming on', payload);
log(me, 'done with', payload);
});
});
yield d.subscribe(4, function(payload, waitFor) {
var me = 'Subscriber 4';
return ceci.go(function*() {
log(me, 'received', payload);
log(me, 'waiting for', 5);
yield waitFor(5);
log(me, 'resuming on', payload);
log(me, 'done with', payload);
});
});
yield d.dispatch('Hello!');
yield d.subscribe(5, function(payload, waitFor) {
var me = 'Subscriber 5';
return ceci.go(function*() {
log(me, 'received', payload);
log(me, 'done with', payload);
});
});
yield d.dispatch('Goodbye!');
});
};
// run the test function if this file is executed independently
if (require.main == module)
test();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment