Skip to content

Instantly share code, notes, and snippets.

@tlrobinson
Last active August 29, 2015 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tlrobinson/c8407e0e3f31a0da5400 to your computer and use it in GitHub Desktop.
Save tlrobinson/c8407e0e3f31a0da5400 to your computer and use it in GitHub Desktop.
Proof-of-concept "classical" FBP using JavaScript streams/promises, and optionally generators via Bluebird's "coroutine" method.
// Runs in iojs or node.js versions >= 0.11.2
//
// npm install bluebird
// node collate.js
//
var Promise = require('bluebird');
var stream = require('stream');
var util = require('util');
// simple input port that reads the value of IPs from an array of values
function MockInPort(array) {
stream.Readable.call(this, { objectMode: true });
this.array = array || [];
}
util.inherits(MockInPort, stream.Readable);
MockInPort.prototype._read = function(size) {
// ignore "size" for now, just read one
if (this.array.length > 0) {
this.push({ contents: this.array.shift() });
} else {
this.push(null);
}
}
// simple output port that writes the contents of IPs to an array
function MockOutPort(array) {
stream.Writable.call(this, { objectMode: true });
this.array = array || [];
}
util.inherits(MockOutPort, stream.Writable);
MockOutPort.prototype._write = function(chunk, encoding, callback) {
this.array.push(chunk.contents);
callback();
}
// patch all readable streams with a "receive" method which returns a Promise
stream.Readable.prototype.receive = function(size) {
var self = this;
var data = this.read();
if (data === null) {
return new Promise(function (resolve, reject) {
self.once('readable', function() {
resolve(self.read());
});
self.once('end', function() {
resolve(null);
// reject('Stream ended');
});
});
} else {
return Promise.resolve(data);
}
}
// patch all writable streams with a "send" method
stream.Writable.prototype.send = function(ip) {
this.write(ip);
}
// port of JSFBP's collate component using Bluebird's "coroutine" method and "yield" instead of Fibers
var collate1 = Promise.coroutine(function *collate1() {
var ctlfields = InputPort.openInputPort('CTLFIELDS');
var inportArray = InputPortArray.openInputPortArray('IN');
var outport = OutputPort.openOutputPort('OUT');
var ctlfieldsP = yield ctlfields.receive();
IP.drop(ctlfieldsP);
var fields = ctlfieldsP.contents.split(',').map(function(str) { return parseInt(str); });
var totalFieldLength = fields.reduce(function(acc, n) { return acc + n; }, 0);
var portCount = inportArray.length;
var ips = [];
for (var index = 0; index < inportArray.length; index++) {
ips[index] = yield inportArray[index].receive();
if (ips[index] === null) {
portCount--;
}
}
while (portCount) {
var lowestIndex = 0;
var lowestKey = "\uffff";
ips.forEach(function(ip, portIndex) {
if (ip !== null) {
var key = ip.contents.substring(0, totalFieldLength);
if (key < lowestKey) {
lowestKey = key;
lowestIndex = portIndex;
}
}
});
outport.send(ips[lowestIndex]);
ips[lowestIndex] = yield inportArray[lowestIndex].receive();
if (ips[lowestIndex] === null) {
portCount--;
}
}
});
// port of JSFBP's collate component using promises instead of Fibers
var collate2 = function collate2() {
var ctlfields = InputPort.openInputPort('CTLFIELDS');
var inportArray = InputPortArray.openInputPortArray('IN');
var outport = OutputPort.openOutputPort('OUT');
var ips, portCount, totalFieldLength;
function loop() {
if (portCount > 0) {
var lowestIndex = 0;
var lowestKey = "\uffff";
ips.forEach(function(ip, portIndex) {
if (ip !== null) {
var key = ip.contents.substring(0, totalFieldLength);
if (key < lowestKey) {
lowestKey = key;
lowestIndex = portIndex;
}
}
});
outport.send(ips[lowestIndex]);
return inportArray[lowestIndex].receive().then(function(ip) {
ips[lowestIndex] = ip;
if (ips[lowestIndex] === null) {
portCount--;
}
return loop();
});
} else {
return;
}
}
return ctlfields.receive().then(function (ctlfieldsP) {
IP.drop(ctlfieldsP);
var fields = ctlfieldsP.contents.split(',').map(function(str) { return parseInt(str); });
totalFieldLength = fields.reduce(function(acc, n) { return acc + n; }, 0);
return Promise.all(inportArray.map(function(inport) {
return inport.receive();
}));
}).then(function(_ips) {
ips = _ips;
portCount = _ips.filter(function(ip) { return !!ip; }).length;
return loop();
});
};
// Faked FBP API, real version would return processes/ports that were defined in the network
var collate_CTLFIELDS, collate_IN0, collate_IN1, collate_OUT;
var IP = {
drop: function() {
}
};
var InputPort = {
openInputPort: function (name) {
return collate_CTLFIELDS;
}
};
var InputPortArray = {
openInputPortArray: function(name) {
return [collate_IN0, collate_IN1];
}
};
var OutputPort = {
openOutputPort: function (name) {
return collate_OUT;
}
};
function run(collate, message) {
console.log(message)
collate_CTLFIELDS = new MockInPort(['1']);
collate_IN0 = new MockInPort(['1,m1','2,m2','3,m3']);
collate_IN1 = new MockInPort(['1,d11','1,d12','2,d21','3,d31','3,d32','3,d33','4,d41']);
collate_OUT = new MockOutPort();
return collate().then(function() {
console.log(collate_OUT.array);
});
}
Promise.resolve().then(function() {
return run(collate1, "collate (using 'Bluebird.coroutine' and 'yield')");
}).then(function() {
return run(collate2, "collate2 (using promises only)");
}).done();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment