Skip to content

Instantly share code, notes, and snippets.

@odf
Created February 11, 2015 10:11
Show Gist options
  • Save odf/1341324715e21c49d132 to your computer and use it in GitHub Desktop.
Save odf/1341324715e21c49d132 to your computer and use it in GitHub Desktop.
'use strict';
var csp = require('plexus-csp');
// Creates a detached (i.e. never yielded) asynchronous task with some error
// reporting upon failure. The explicit error handling is necessary because
// error propagation to the calling task only works if we yield the result.
var detach = function(task) {
csp.go(task).then(null, function(ex) {
console.log(ex + '\n' + ex.stack);
});
};
// Runs the asynchronous function fn on the payload within an enclosing
// detached task and writes the result to the provided channel with the given
// id attached.
var runAsync = function(fn, payload, id, outchan) {
detach(function*() {
outchan.push({ id: id, result: yield fn(payload) });
});
};
// Runs fn asynchronously on each value from the input channel and puts
// the results onto the output channel in order.
var asyncMap = function(input, output, fn) {
return csp.go(function*() {
var unsortedResults = csp.chan();
var results = [];
var nextInputID = 1;
var nextOutputID = 1;
var actionsTried, actionPerformed;
while (input || (output && nextOutputID < nextInputID)) {
// always try to read new inputs and new computation results
actionsTried = [input, unsortedResults];
// if we have the next result, try to write it to output
if (output && results[nextOutputID] !== undefined)
actionsTried.push([output, results[nextOutputID]]);
// perform exactly one of those two or three read/write actions
actionPerformed = yield csp.select.apply(null, actionsTried);
// branch according to which action was successful
if (actionPerformed.channel == input) {
if (actionPerformed.value === undefined) // input was closed
input = null;
else {
runAsync(fn, actionPerformed.value, nextInputID, unsortedResults);
++nextInputID;
}
} else if (actionPerformed.channel == output) {
if (actionPerformed.value == false) // output was closed
output = null;
else {
delete results[nextOutputID];
++nextOutputID;
}
} else { // we got a result via unsortedResults
results[actionPerformed.value.id] = actionPerformed.value.result;
}
}
});
};
var asynchronousComputation = function(n) {
var randomPause = function() {
return csp.sleep(Math.random() * 1000);
};
return csp.go(function*() {
yield randomPause();
console.log("start processing "+n);
yield randomPause();
console.log("end processing "+n);
return "<"+n+">";
});
};
var input = csp.chan();
var output = csp.chan();
detach(function*() {
var producer = csp.go(function*() {
for (var i = 1; i < 10; ++i)
yield input.push(i);
input.close();
});
var consumer = csp.each(console.log, output);
var mapper = asyncMap(input, output, asynchronousComputation);
yield csp.join([producer, consumer, mapper]);
});
@odf
Copy link
Author

odf commented Feb 11, 2015

My take on an "asynchronous map" implemented with Go-style channels. Individual computations are done in parallel, but the results are written in order. Termination and error semantics may need some thought, and probably depend a bit on the intended application.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment