Skip to content

Instantly share code, notes, and snippets.

@mhart
Forked from amasad/csp.js
Last active March 15, 2017 23:10
Show Gist options
  • Save mhart/81a997233d2c37395d92 to your computer and use it in GitHub Desktop.
Save mhart/81a997233d2c37395d92 to your computer and use it in GitHub Desktop.
var spawn = require('child_process').spawn;
var csp = require('js-csp');
class Math {
constructor(readyCallback) {
this._readyCallback = readyCallback;
this._worker = spawn('6to5-node', ['worker.js']);
this._worker.stderr.pipe(process.stderr);
this._reqChan = csp.chan();
this._resChan = csp.chan()
let ch = this._inChan = csp.chan();
this._worker.stdout.on('data', (msg) => csp.go(function*() {
yield csp.put(ch, msg.toString().trim());
}));
csp.spawn(this._run());
}
*_run() {
let msg = yield csp.take(this._inChan);
if (msg != 'ready') {
throw new Error('Expected ready signal but got : ' + msg);
}
this._readyCallback();
while (1) {
let msg = yield csp.take(this._reqChan)
this._worker.stdin.write(msg.join(' '));
let reply = yield csp.take(this._inChan);
yield csp.put(this._resChan, reply);
}
}
add(a, b, callback) {
var self = this;
csp.go(function*() {
yield csp.put(self._reqChan, ['add', a, b]);
callback(yield csp.take(self._resChan));
});
}
subtract(a, b, callback) {
var self = this;
csp.go(function*() {
yield csp.put(self._reqChan, ['subtract', a, b]);
callback(yield csp.take(self._resChan));
});
}
}
var math = new Math(onMathReady);
function onMathReady() {
math.add(1, 2, function(result) {
console.log(' 1 + 2 = ', result);
});
math.subtract(2, 2, function(result) {
console.log(' 2 - 2 = ', result);
});
}
~/code/csp-test $ 6to5-node --experimental --ignore=lol stateful.js
1 + 2 = 3
2 - 2 = 0
~/code/csp-test $ 6to5-node --experimental --ignore=lol csp.js
1 + 2 = 3
2 - 2 = 0
var spawn = require('child_process').spawn;
const STATE_START = 1;
const STATE_READY = 2
const STATE_WORKING = 3;
class Math {
constructor(readyCallback) {
this._state = STATE_START;
this._readyCallback = readyCallback;
this._msgQueue = [];
this._inflightMsgCallback = null;
this._worker = spawn('6to5-node', ['worker.js']);
this._worker.stdout.on('data', this._messageHandler.bind(this));
this._worker.stderr.pipe(process.stderr);
}
add(a, b, callback) {
this._sendMessage(['add', a, b], callback);
}
subtract(a, b, callback) {
this._sendMessage(['subtract', a, b], callback);
}
_messageHandler(message) {
message = message.toString().trim();
if (this._state === STATE_START) {
if (message !== 'ready') {
throw new Error('Expected ready message but got: ' + message);
} else {
this._state = STATE_READY;
this._readyCallback();
}
} else if (this._state === STATE_WORKING) {
this._inflightMsgCallback(message);
this._state = STATE_READY;
this._processQueue();
} else {
throw new Error('Unexpected message in ready state');
}
}
_sendMessage(data, callback) {
if (this._state === STATE_START) {
throw new Error('Invalid state');
}
this._msgQueue.push({
data: data.join(' '),
callback: callback
});
this._processQueue();
}
_processQueue() {
if (this._state !== STATE_READY) {
return;
}
var msg = this._msgQueue.shift();
if (!msg) {
return;
}
this._inflightMsgCallback = msg.callback;
this._state = STATE_WORKING;
this._worker.stdin.write(msg.data);
}
}
var math = new Math(onMathReady);
function onMathReady() {
math.add(1, 2, function(result) {
console.log(' 1 + 2 = ', result);
});
math.subtract(2, 2, function(result) {
console.log(' 2 - 2 = ', result);
});
}
/*
protocol:
--> ready
<-- add 1 2
--> 3
*/
var methods = {
add(a,b) { return parseInt(a) + parseInt(b) },
subtract(a, b) {return parseInt(a) - parseInt(b) }
};
var working = false;
process.stdin.on('data', function(msg) {
if (working) {
throw new Error('Can only process one request at a time');
}
working = true
var parts = msg.toString().split(/\s+/);
setTimeout(function() {
process.stdout.write(methods[parts[0]].apply(null, parts.slice(1)) + '\n');
working = false;
}, 500);
});
process.stdout.write('ready\n');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment