Created
February 11, 2015 10:11
-
-
Save odf/1341324715e21c49d132 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var csp = require('plexus-csp'); | |
// Creates a detached (i.e. never yielded) asynchronous task with some error | |
// reporting upon failure. The explicit error handling is necessary because | |
// error propagation to the calling task only works if we yield the result. | |
var detach = function(task) { | |
csp.go(task).then(null, function(ex) { | |
console.log(ex + '\n' + ex.stack); | |
}); | |
}; | |
// Runs the asynchronous function fn on the payload within an enclosing | |
// detached task and writes the result to the provided channel with the given | |
// id attached. | |
var runAsync = function(fn, payload, id, outchan) { | |
detach(function*() { | |
outchan.push({ id: id, result: yield fn(payload) }); | |
}); | |
}; | |
// Runs fn asynchronously on each value from the input channel and puts | |
// the results onto the output channel in order. | |
var asyncMap = function(input, output, fn) { | |
return csp.go(function*() { | |
var unsortedResults = csp.chan(); | |
var results = []; | |
var nextInputID = 1; | |
var nextOutputID = 1; | |
var actionsTried, actionPerformed; | |
while (input || (output && nextOutputID < nextInputID)) { | |
// always try to read new inputs and new computation results | |
actionsTried = [input, unsortedResults]; | |
// if we have the next result, try to write it to output | |
if (output && results[nextOutputID] !== undefined) | |
actionsTried.push([output, results[nextOutputID]]); | |
// perform exactly one of those two or three read/write actions | |
actionPerformed = yield csp.select.apply(null, actionsTried); | |
// branch according to which action was successful | |
if (actionPerformed.channel == input) { | |
if (actionPerformed.value === undefined) // input was closed | |
input = null; | |
else { | |
runAsync(fn, actionPerformed.value, nextInputID, unsortedResults); | |
++nextInputID; | |
} | |
} else if (actionPerformed.channel == output) { | |
if (actionPerformed.value == false) // output was closed | |
output = null; | |
else { | |
delete results[nextOutputID]; | |
++nextOutputID; | |
} | |
} else { // we got a result via unsortedResults | |
results[actionPerformed.value.id] = actionPerformed.value.result; | |
} | |
} | |
}); | |
}; | |
var asynchronousComputation = function(n) { | |
var randomPause = function() { | |
return csp.sleep(Math.random() * 1000); | |
}; | |
return csp.go(function*() { | |
yield randomPause(); | |
console.log("start processing "+n); | |
yield randomPause(); | |
console.log("end processing "+n); | |
return "<"+n+">"; | |
}); | |
}; | |
var input = csp.chan(); | |
var output = csp.chan(); | |
detach(function*() { | |
var producer = csp.go(function*() { | |
for (var i = 1; i < 10; ++i) | |
yield input.push(i); | |
input.close(); | |
}); | |
var consumer = csp.each(console.log, output); | |
var mapper = asyncMap(input, output, asynchronousComputation); | |
yield csp.join([producer, consumer, mapper]); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
My take on an "asynchronous map" implemented with Go-style channels. Individual computations are done in parallel, but the results are written in order. Termination and error semantics may need some thought, and probably depend a bit on the intended application.