-
-
Save amasad/89e99dd4e7cf5704c816 to your computer and use it in GitHub Desktop.
var spawn = require('child_process').spawn; | |
var csp = require('js-csp'); | |
class Math { | |
constructor(readyCallback) { | |
this._readyCallback = readyCallback; | |
this._worker = spawn('6to5-node', ['worker.js']); | |
this._worker.stderr.pipe(process.stderr); | |
this._reqChan = csp.chan(); | |
this._resChan = csp.chan() | |
let ch = this._inChan = csp.chan(); | |
this._worker.stdout.on('data', (msg) => csp.go(function*() { | |
yield csp.put(ch, msg.toString().trim()); | |
})); | |
csp.spawn(this._run()); | |
} | |
*_run() { | |
let msg = yield csp.take(this._inChan); | |
if (msg != 'ready') { | |
throw new Error('Expected ready signal but got : ' + msg); | |
} | |
this._readyCallback(); | |
while (1) { | |
let msg = yield csp.take(this._reqChan) | |
this._worker.stdin.write(msg.join(' ')); | |
let reply = yield csp.take(this._inChan); | |
yield csp.put(this._resChan, reply); | |
} | |
} | |
add(a, b, callback) { | |
var self = this; | |
csp.go(function*() { | |
yield csp.put(self._reqChan, ['add', a, b]); | |
callback(yield csp.take(self._resChan)); | |
}); | |
} | |
subtract(a, b, callback) { | |
var self = this; | |
csp.go(function*() { | |
yield csp.put(self._reqChan, ['subtract', a, b]); | |
callback(yield csp.take(self._resChan)); | |
}); | |
} | |
} | |
var math = new Math(onMathReady); | |
function onMathReady() { | |
math.add(1, 2, function(result) { | |
console.log(' 1 + 2 = ', result); | |
}); | |
math.subtract(2, 2, function(result) { | |
console.log(' 2 - 2 = ', result); | |
}); | |
} |
~/code/csp-test $ 6to5-node --experimental --ignore=lol stateful.js | |
1 + 2 = 3 | |
2 - 2 = 0 | |
~/code/csp-test $ 6to5-node --experimental --ignore=lol csp.js | |
1 + 2 = 3 | |
2 - 2 = 0 | |
var spawn = require('child_process').spawn; | |
const STATE_START = 1; | |
const STATE_READY = 2 | |
const STATE_WORKING = 3; | |
class Math { | |
constructor(readyCallback) { | |
this._state = STATE_START; | |
this._readyCallback = readyCallback; | |
this._msgQueue = []; | |
this._inflightMsgCallback = null; | |
this._worker = spawn('6to5-node', ['worker.js']); | |
this._worker.stdout.on('data', this._messageHandler.bind(this)); | |
this._worker.stderr.pipe(process.stderr); | |
} | |
add(a, b, callback) { | |
this._sendMessage(['add', a, b], callback); | |
} | |
subtract(a, b, callback) { | |
this._sendMessage(['subtract', a, b], callback); | |
} | |
_messageHandler(message) { | |
message = message.toString().trim(); | |
if (this._state === STATE_START) { | |
if (message !== 'ready') { | |
throw new Error('Expected ready message but got: ' + message); | |
} else { | |
this._state = STATE_READY; | |
this._readyCallback(); | |
} | |
} else if (this._state === STATE_WORKING) { | |
this._inflightMsgCallback(message); | |
this._state = STATE_READY; | |
this._processQueue(); | |
} else { | |
throw new Error('Unexpected message in ready state'); | |
} | |
} | |
_sendMessage(data, callback) { | |
if (this._state === STATE_START) { | |
throw new Error('Invalid state'); | |
} | |
this._msgQueue.push({ | |
data: data.join(' '), | |
callback: callback | |
}); | |
this._processQueue(); | |
} | |
_processQueue() { | |
if (this._state !== STATE_READY) { | |
return; | |
} | |
var msg = this._msgQueue.shift(); | |
if (!msg) { | |
return; | |
} | |
this._inflightMsgCallback = msg.callback; | |
this._state = STATE_WORKING; | |
this._worker.stdin.write(msg.data); | |
} | |
} | |
var math = new Math(onMathReady); | |
function onMathReady() { | |
math.add(1, 2, function(result) { | |
console.log(' 1 + 2 = ', result); | |
}); | |
math.subtract(2, 2, function(result) { | |
console.log(' 2 - 2 = ', result); | |
}); | |
} |
/* | |
protocol: | |
--> ready | |
<-- add 1 2 | |
--> 3 | |
*/ | |
var methods = { | |
add(a,b) { return parseInt(a) + parseInt(b) }, | |
subtract(a, b) {return parseInt(a) - parseInt(b) } | |
}; | |
var working = false; | |
process.stdin.on('data', function(msg) { | |
if (working) { | |
throw new Error('Can only process one request at a time'); | |
} | |
working = true | |
var parts = msg.toString().split(/\s+/); | |
setTimeout(function() { | |
process.stdout.write(methods[parts[0]].apply(null, parts.slice(1)) + '\n'); | |
working = false; | |
}, 500); | |
}); | |
process.stdout.write('ready\n'); |
So here's how I'd do it with streams and the async library.
I mean, essentially - your csp channels are acting like queues, so I'm not convinced about the "no explicit queueing" - you can essentially achieve the same thing using something that supports an async push and pop. It seems to me that the generators and yield
are doing most of the work here.
The generators allow you to yield
inline which is a nice syntax, but stream.once
or async.series
aren't really that much harder - just a bit more syntax declaring (or binding) the callbacks.
I don't necessarily think this is better at all – just that it's not that different from channels if you use an async queuing mechanism.
var util = require('util'),
spawn = require('child_process').spawn,
Transform = require('stream').Transform,
async = require('async')
function MyMath(readyCallback) {
this._sendQueue = async.queue(this._sendRcvMessage.bind(this))
this._worker = spawn('node', ['worker.js'])
this._worker.stderr.pipe(process.stderr)
this._run(this._worker.stdout, readyCallback)
}
MyMath.prototype.add = function(a, b, callback) {
this._sendQueue.push({args: ['add', a, b]}, callback)
}
MyMath.prototype.subtract = function(a, b, callback) {
this._sendQueue.push({args: ['subtract', a, b]}, callback)
}
MyMath.prototype._run = function(workerStream, readyCallback) {
this._recvStream = workerStream.pipe(new Trimify())
this._recvStream.once('data', function(message) {
if (message != 'ready') {
throw new Error('Expected ready message but got: ' + message)
}
readyCallback()
})
}
MyMath.prototype._sendRcvMessage = function(data, callback) {
this._worker.stdin.write(data.args.join(' '),
this._recvStream.once.bind(this._recvStream, 'data', callback))
}
// Could have been defined inline if we wanted, but for illustration...
function Trimify() {
Transform.call(this, {objectMode: true})
}
util.inherits(Trimify, Transform)
Trimify.prototype._transform = function(data, encoding, callback) {
callback(null, data.toString().trim())
}
var math = new MyMath(onMathReady)
function onMathReady() {
math.add(1, 2, function(result) {
console.log(' 1 + 2 = ', result)
})
math.subtract(2, 2, function(result) {
console.log(' 2 - 2 = ', result)
})
}
Neat! Thanks for taking the time.
I think, the use of .once
on a data stream is pretty much a hack. And I did that a lot when I want to introduce code localityand not have a global data handler. With .once
you have to be there at the right place at the right time which makes the code very brittle.
Secondly, what if one of the methods is more chatty. With CSP, you can have all the interaction in the process:
guessNumber(callback) {
var self = this;
var number = 1;
csp.go(function*() {
yield csp.put(self._reqChan, ['guessNumber']);
while (true) {
var reply = yield csp.take(self._resChan));
if (reply === number) {
yield csp.put(self._reqChan, ['goodEnough']);
callback(true);
break;
}
yield csp.put(self._reqChan, ['notGoodEnough']);
});
}
When you say "right place right time", I'm not entirely sure what you mean – have you got an example? Are you talking about complications that can happen with nextTick or something?
I guess the one thing that is easier is that you don't end up quite as "pyramidy" by default – using callbacks you have to split up your flow a little more.
TBH, I often think that's a good thing, I mean, I wouldn't want too much more than what you've got there in terms of flow to figure out what's going on.
But you're right in that the simple callback case ends up a little messier... but I dunno... not too bad.
Again, I mostly think it's the generators and yields that are helping you here
reqChan.send('guessNumber', function checkGuess() {
resChan.once('data', function(reply) {
if (reply === number) {
return reqChan.send('goodEnough', callback.bind(null, true))
}
reqChan.send('notGoodEnough', checkGuess)
})
})
Oh, I meant it doesn't block nor buffer. If someone writes to a stream before your once listener is attached, you'll miss that message.
Yes, I think we're back to square one with callback hell etc. And yes, I think generators are the magic sauce, however, you can contrast async/await to channels, both underlying implementation is generator, but one is more flexible than the other. With channels you can have two communications and you can use them as promises or streams. They're a superior abstraction, I think.
Another major difference between CSP and FRP is that FRP makes pub-sub very straight forward.
CSP also supports pub-sub, with slightly more code. But CSP also supports the inbox pattern. Which is to say, single receiver per item in the stream.
A common use case I can think of is, a stream of values, that need to be given to one of four identical web-workers to do some slow computation. With FRP is would be fairly trivial distribute the values equally among the four workers. But with CSP you can give values to whichever worker is ready. All you need is one go block per worker.
Example code here:
https://gist.github.com/nmn/f9124e3e998297507111
I would love to see an FRP implementation. I can't think of one.
Here are the benefits of the csp approach: