Skip to content

Instantly share code, notes, and snippets.

@amasad
Created January 18, 2015 03:52
Show Gist options
  • Save amasad/89e99dd4e7cf5704c816 to your computer and use it in GitHub Desktop.
Save amasad/89e99dd4e7cf5704c816 to your computer and use it in GitHub Desktop.
A contrived example to play around with js-csp in solving problems that would traditionally employ a state machine. Imagine having a math library that needs to call into an rpc system to compute math functions. The service can only handle one request pre client at a time, therefore the client needs to do the buffering. (see comments for more info)
var spawn = require('child_process').spawn;
var csp = require('js-csp');
class Math {
constructor(readyCallback) {
this._readyCallback = readyCallback;
this._worker = spawn('6to5-node', ['worker.js']);
this._worker.stderr.pipe(process.stderr);
this._reqChan = csp.chan();
this._resChan = csp.chan()
let ch = this._inChan = csp.chan();
this._worker.stdout.on('data', (msg) => csp.go(function*() {
yield csp.put(ch, msg.toString().trim());
}));
csp.spawn(this._run());
}
*_run() {
let msg = yield csp.take(this._inChan);
if (msg != 'ready') {
throw new Error('Expected ready signal but got : ' + msg);
}
this._readyCallback();
while (1) {
let msg = yield csp.take(this._reqChan)
this._worker.stdin.write(msg.join(' '));
let reply = yield csp.take(this._inChan);
yield csp.put(this._resChan, reply);
}
}
add(a, b, callback) {
var self = this;
csp.go(function*() {
yield csp.put(self._reqChan, ['add', a, b]);
callback(yield csp.take(self._resChan));
});
}
subtract(a, b, callback) {
var self = this;
csp.go(function*() {
yield csp.put(self._reqChan, ['subtract', a, b]);
callback(yield csp.take(self._resChan));
});
}
}
var math = new Math(onMathReady);
function onMathReady() {
math.add(1, 2, function(result) {
console.log(' 1 + 2 = ', result);
});
math.subtract(2, 2, function(result) {
console.log(' 2 - 2 = ', result);
});
}
~/code/csp-test $ 6to5-node --experimental --ignore=lol stateful.js
1 + 2 = 3
2 - 2 = 0
~/code/csp-test $ 6to5-node --experimental --ignore=lol csp.js
1 + 2 = 3
2 - 2 = 0
var spawn = require('child_process').spawn;
const STATE_START = 1;
const STATE_READY = 2
const STATE_WORKING = 3;
class Math {
constructor(readyCallback) {
this._state = STATE_START;
this._readyCallback = readyCallback;
this._msgQueue = [];
this._inflightMsgCallback = null;
this._worker = spawn('6to5-node', ['worker.js']);
this._worker.stdout.on('data', this._messageHandler.bind(this));
this._worker.stderr.pipe(process.stderr);
}
add(a, b, callback) {
this._sendMessage(['add', a, b], callback);
}
subtract(a, b, callback) {
this._sendMessage(['subtract', a, b], callback);
}
_messageHandler(message) {
message = message.toString().trim();
if (this._state === STATE_START) {
if (message !== 'ready') {
throw new Error('Expected ready message but got: ' + message);
} else {
this._state = STATE_READY;
this._readyCallback();
}
} else if (this._state === STATE_WORKING) {
this._inflightMsgCallback(message);
this._state = STATE_READY;
this._processQueue();
} else {
throw new Error('Unexpected message in ready state');
}
}
_sendMessage(data, callback) {
if (this._state === STATE_START) {
throw new Error('Invalid state');
}
this._msgQueue.push({
data: data.join(' '),
callback: callback
});
this._processQueue();
}
_processQueue() {
if (this._state !== STATE_READY) {
return;
}
var msg = this._msgQueue.shift();
if (!msg) {
return;
}
this._inflightMsgCallback = msg.callback;
this._state = STATE_WORKING;
this._worker.stdin.write(msg.data);
}
}
var math = new Math(onMathReady);
function onMathReady() {
math.add(1, 2, function(result) {
console.log(' 1 + 2 = ', result);
});
math.subtract(2, 2, function(result) {
console.log(' 2 - 2 = ', result);
});
}
/*
protocol:
--> ready
<-- add 1 2
--> 3
*/
var methods = {
add(a,b) { return parseInt(a) + parseInt(b) },
subtract(a, b) {return parseInt(a) - parseInt(b) }
};
var working = false;
process.stdin.on('data', function(msg) {
if (working) {
throw new Error('Can only process one request at a time');
}
working = true
var parts = msg.toString().split(/\s+/);
setTimeout(function() {
process.stdout.write(methods[parts[0]].apply(null, parts.slice(1)) + '\n');
working = false;
}, 500);
});
process.stdout.write('ready\n');
@nmn
Copy link

nmn commented Apr 7, 2015

Another major difference between CSP and FRP is that FRP makes pub-sub very straight forward.
CSP also supports pub-sub, with slightly more code. But CSP also supports the inbox pattern. Which is to say, single receiver per item in the stream.

A common use case I can think of is, a stream of values, that need to be given to one of four identical web-workers to do some slow computation. With FRP is would be fairly trivial distribute the values equally among the four workers. But with CSP you can give values to whichever worker is ready. All you need is one go block per worker.

Example code here:
https://gist.github.com/nmn/f9124e3e998297507111

I would love to see an FRP implementation. I can't think of one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment