Last active
August 29, 2015 14:05
-
-
Save odf/ff094c8c796883fba06b to your computer and use it in GitHub Desktop.
Asynchronous Flux-like dispatcher with deadlock prevention via Go-style CSP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Asynchronous Flux-like dispatcher with deadlock prevention via Go-style CSP. | |
// | |
// - my libraries ceci-core and ceci-channel are available from npm | |
// - pre-compile this file with regenerator | |
// - build with webpack or browserify | |
'use strict'; | |
var ceci = require('ceci-core'); | |
var chan = require('ceci-channels'); | |
// clones an object, mapping values via the function fn | |
var mapObject = function(obj, fn) { | |
var result = {}, key; | |
for (key in obj) | |
if (obj.hasOwnProperty(key)) | |
result[key] = fn(key, obj[key]); | |
return result; | |
}; | |
// exception-handling variant for top-level go blocks | |
var detach = function(task) { | |
ceci.go(task).then(null, function(ex) { | |
console.error(ex + '\n' + ex.stack); | |
}); | |
}; | |
// creates and runs a dispatcher | |
var dispatcher = function() { | |
var queue = chan.chan(50); // FIFO queue for all calls to dispatcher | |
var clients = {}; // subscribed callbacks | |
// subscribes a client; id must be a unique number or string | |
var subscribe = function(id, handler) { | |
clients[id] = handler; | |
}; | |
// unsubscribes a client | |
var unsubscribe = function(id) { | |
delete clients[id]; | |
}; | |
// dispatches a payload to all subscribed clients | |
var dispatch = function(payload) { | |
var done; // a channel for each subscriber | |
var dependency = {}; // who is currently waiting for whom | |
done = mapObject(clients, function() { return chan.chan(); }); | |
// returns a task that waits for waitedFor to finish | |
var waitFor = function(waiting, waitedFor) { | |
if (!clients[waitedFor]) { | |
console.error('WARNING: waitFor(' + waitedFor + ') by ' + waiting | |
+ ' ignored (no such subscriber)'); | |
return; | |
} | |
return ceci.go(function*() { | |
// deadlock detection | |
// CAUTION: this will not work as is in a multi-threaded system | |
var t = waitedFor; | |
while (dependency[t] !== undefined) | |
t = dependency[t]; | |
// in case of deadlock, issue a warning and return | |
if (t == waiting) { | |
console.error('WARNING: waitFor(' + waitedFor + ') by ' + waiting | |
+ ' ignored (would create deadlock)'); | |
return; | |
} | |
// register dependency, wait for completion, remove dependency | |
dependency[waiting] = waitedFor | |
yield chan.pull(done[waitedFor]); | |
delete dependency[waiting]; | |
}); | |
}; | |
// create an asynchronous task for each subscriber | |
var tasks = Object.keys(clients).map(function(id) { | |
return ceci.go(function*() { | |
// run callback for subscriber id and wait for completion | |
yield clients[id](payload, function(other) { | |
return waitFor(id, other); | |
}) | |
// signal completion of subscriber id | |
// (all pulls from a closed channel return immediately) | |
chan.close(done[id]); | |
}); | |
}); | |
// return a compound task that waits for all these tasks to finish | |
return ceci.join(tasks); | |
}; | |
// main dispatcher loop: reads and fullfills requests from the queue | |
detach(function*() { | |
var task; | |
while (undefined != (task = yield chan.pull(queue))) { | |
switch (task.action) { | |
case 'subscribe': | |
subscribe(task.id, task.handler); | |
break; | |
case 'unsubscribe': | |
unsubscribe(task.id); | |
break; | |
case 'dispatch': | |
yield dispatch(task.payload); | |
break | |
} | |
} | |
}); | |
// the methods of the returned object simply queue requests | |
return { | |
subscribe: function(id, handler) { | |
return chan.push(queue, { | |
action : 'subscribe', | |
id : id, | |
handler: handler | |
}); | |
}, | |
unsubscribe: function(id) { | |
return chan.push(queue, { | |
action: 'unsubscribe', | |
id : id | |
}); | |
}, | |
dispatch: function(payload) { | |
return chan.push(queue, { | |
action : 'dispatch', | |
payload: payload | |
}); | |
} | |
}; | |
}; | |
// a quick test function for the dispatcher functionality | |
var test = function() { | |
detach(function*() { | |
var d = dispatcher(); | |
var log = function(subject, predicate, object) { | |
console.log([subject, predicate, JSON.stringify(object)].join(' ')); | |
}; | |
yield d.subscribe(1, function(payload, waitFor) { | |
var me = 'Subscriber 1'; | |
return ceci.go(function*() { | |
log(me, 'received', payload); | |
if (payload == 'Hello!') { | |
log(me, 'waiting for', 2); | |
yield waitFor(2); | |
log(me, 'resuming on', payload); | |
} | |
log(me, 'done with', payload); | |
}); | |
}); | |
yield d.subscribe(2, function(payload, waitFor) { | |
var me = 'Subscriber 2'; | |
return ceci.go(function*() { | |
log(me, 'received', payload); | |
log(me, 'waiting for', 3); | |
yield waitFor(3); | |
log(me, 'resuming on', payload); | |
log(me, 'done with', payload); | |
}); | |
}); | |
yield d.subscribe(3, function(payload, waitFor) { | |
var me = 'Subscriber 3'; | |
return ceci.go(function*() { | |
log(me, 'received', payload); | |
log(me, 'waiting for', 1); | |
yield waitFor(1); | |
log(me, 'resuming on', payload); | |
log(me, 'done with', payload); | |
}); | |
}); | |
yield d.subscribe(4, function(payload, waitFor) { | |
var me = 'Subscriber 4'; | |
return ceci.go(function*() { | |
log(me, 'received', payload); | |
log(me, 'waiting for', 5); | |
yield waitFor(5); | |
log(me, 'resuming on', payload); | |
log(me, 'done with', payload); | |
}); | |
}); | |
yield d.dispatch('Hello!'); | |
yield d.subscribe(5, function(payload, waitFor) { | |
var me = 'Subscriber 5'; | |
return ceci.go(function*() { | |
log(me, 'received', payload); | |
log(me, 'done with', payload); | |
}); | |
}); | |
yield d.dispatch('Goodbye!'); | |
}); | |
}; | |
// run the test function if this file is executed independently | |
if (require.main == module) | |
test(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment