Last active
August 29, 2015 14:15
-
-
Save tlrobinson/c8407e0e3f31a0da5400 to your computer and use it in GitHub Desktop.
Proof-of-concept "classical" FBP using JavaScript streams/promises, and optionally generators via Bluebird's "coroutine" method.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Runs in iojs or node.js versions >= 0.11.2 | |
// | |
// npm install bluebird | |
// node collate.js | |
// | |
var Promise = require('bluebird'); | |
var stream = require('stream'); | |
var util = require('util'); | |
// simple input port that reads the value of IPs from an array of values | |
function MockInPort(array) { | |
stream.Readable.call(this, { objectMode: true }); | |
this.array = array || []; | |
} | |
util.inherits(MockInPort, stream.Readable); | |
MockInPort.prototype._read = function(size) { | |
// ignore "size" for now, just read one | |
if (this.array.length > 0) { | |
this.push({ contents: this.array.shift() }); | |
} else { | |
this.push(null); | |
} | |
} | |
// simple output port that writes the contents of IPs to an array | |
function MockOutPort(array) { | |
stream.Writable.call(this, { objectMode: true }); | |
this.array = array || []; | |
} | |
util.inherits(MockOutPort, stream.Writable); | |
MockOutPort.prototype._write = function(chunk, encoding, callback) { | |
this.array.push(chunk.contents); | |
callback(); | |
} | |
// patch all readable streams with a "receive" method which returns a Promise | |
stream.Readable.prototype.receive = function(size) { | |
var self = this; | |
var data = this.read(); | |
if (data === null) { | |
return new Promise(function (resolve, reject) { | |
self.once('readable', function() { | |
resolve(self.read()); | |
}); | |
self.once('end', function() { | |
resolve(null); | |
// reject('Stream ended'); | |
}); | |
}); | |
} else { | |
return Promise.resolve(data); | |
} | |
} | |
// patch all writable streams with a "send" method | |
stream.Writable.prototype.send = function(ip) { | |
this.write(ip); | |
} | |
// port of JSFBP's collate component using Bluebird's "coroutine" method and "yield" instead of Fibers | |
var collate1 = Promise.coroutine(function *collate1() { | |
var ctlfields = InputPort.openInputPort('CTLFIELDS'); | |
var inportArray = InputPortArray.openInputPortArray('IN'); | |
var outport = OutputPort.openOutputPort('OUT'); | |
var ctlfieldsP = yield ctlfields.receive(); | |
IP.drop(ctlfieldsP); | |
var fields = ctlfieldsP.contents.split(',').map(function(str) { return parseInt(str); }); | |
var totalFieldLength = fields.reduce(function(acc, n) { return acc + n; }, 0); | |
var portCount = inportArray.length; | |
var ips = []; | |
for (var index = 0; index < inportArray.length; index++) { | |
ips[index] = yield inportArray[index].receive(); | |
if (ips[index] === null) { | |
portCount--; | |
} | |
} | |
while (portCount) { | |
var lowestIndex = 0; | |
var lowestKey = "\uffff"; | |
ips.forEach(function(ip, portIndex) { | |
if (ip !== null) { | |
var key = ip.contents.substring(0, totalFieldLength); | |
if (key < lowestKey) { | |
lowestKey = key; | |
lowestIndex = portIndex; | |
} | |
} | |
}); | |
outport.send(ips[lowestIndex]); | |
ips[lowestIndex] = yield inportArray[lowestIndex].receive(); | |
if (ips[lowestIndex] === null) { | |
portCount--; | |
} | |
} | |
}); | |
// port of JSFBP's collate component using promises instead of Fibers | |
var collate2 = function collate2() { | |
var ctlfields = InputPort.openInputPort('CTLFIELDS'); | |
var inportArray = InputPortArray.openInputPortArray('IN'); | |
var outport = OutputPort.openOutputPort('OUT'); | |
var ips, portCount, totalFieldLength; | |
function loop() { | |
if (portCount > 0) { | |
var lowestIndex = 0; | |
var lowestKey = "\uffff"; | |
ips.forEach(function(ip, portIndex) { | |
if (ip !== null) { | |
var key = ip.contents.substring(0, totalFieldLength); | |
if (key < lowestKey) { | |
lowestKey = key; | |
lowestIndex = portIndex; | |
} | |
} | |
}); | |
outport.send(ips[lowestIndex]); | |
return inportArray[lowestIndex].receive().then(function(ip) { | |
ips[lowestIndex] = ip; | |
if (ips[lowestIndex] === null) { | |
portCount--; | |
} | |
return loop(); | |
}); | |
} else { | |
return; | |
} | |
} | |
return ctlfields.receive().then(function (ctlfieldsP) { | |
IP.drop(ctlfieldsP); | |
var fields = ctlfieldsP.contents.split(',').map(function(str) { return parseInt(str); }); | |
totalFieldLength = fields.reduce(function(acc, n) { return acc + n; }, 0); | |
return Promise.all(inportArray.map(function(inport) { | |
return inport.receive(); | |
})); | |
}).then(function(_ips) { | |
ips = _ips; | |
portCount = _ips.filter(function(ip) { return !!ip; }).length; | |
return loop(); | |
}); | |
}; | |
// Faked FBP API, real version would return processes/ports that were defined in the network | |
var collate_CTLFIELDS, collate_IN0, collate_IN1, collate_OUT; | |
var IP = { | |
drop: function() { | |
} | |
}; | |
var InputPort = { | |
openInputPort: function (name) { | |
return collate_CTLFIELDS; | |
} | |
}; | |
var InputPortArray = { | |
openInputPortArray: function(name) { | |
return [collate_IN0, collate_IN1]; | |
} | |
}; | |
var OutputPort = { | |
openOutputPort: function (name) { | |
return collate_OUT; | |
} | |
}; | |
function run(collate, message) { | |
console.log(message) | |
collate_CTLFIELDS = new MockInPort(['1']); | |
collate_IN0 = new MockInPort(['1,m1','2,m2','3,m3']); | |
collate_IN1 = new MockInPort(['1,d11','1,d12','2,d21','3,d31','3,d32','3,d33','4,d41']); | |
collate_OUT = new MockOutPort(); | |
return collate().then(function() { | |
console.log(collate_OUT.array); | |
}); | |
} | |
Promise.resolve().then(function() { | |
return run(collate1, "collate (using 'Bluebird.coroutine' and 'yield')"); | |
}).then(function() { | |
return run(collate2, "collate2 (using promises only)"); | |
}).done(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment