Skip to content

Instantly share code, notes, and snippets.

@tlrobinson
Created February 14, 2015 03:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tlrobinson/0e1ee15a5aec473c944b to your computer and use it in GitHub Desktop.
Save tlrobinson/0e1ee15a5aec473c944b to your computer and use it in GitHub Desktop.
var fbp = require("../..")
, InputPort = require('../../core/InputPort')
, InputPortArray = require('../../core/InputPortArray')
, OutputPort = require('../../core/OutputPort')
, OutputPortArray = require('../../core/OutputPortArray')
, IP = require('../../core/IP');
function sort() {
var inPort = InputPort.openInputPort('IN');
var outPort = OutputPort.openOutputPort('OUT');
var ips = [];
var ip;
while (ip = inPort.receive()) {
ips.push(ip);
}
ips.sort(function(ip1, ip2) { return (ip1.contents < ip2.contents) ? -1 : 1; });
ips.forEach(function(ip) {
outPort.send(ip);
});
};
function mergesorted() {
var inPorts = InputPortArray.openInputPortArray('IN');
var outPort = OutputPort.openOutputPort('OUT');
var ips = [];
inPorts.forEach(function(inPort, index) {
ips[index] = inPort.receive();
});
function getSmallestIndex() {
var smallestIndex = -1;
var smallestValue = Infinity;
ips.forEach(function(ip, index) {
if (ip && ip.contents < smallestValue) {
smallestValue = ip.contents;
smallestIndex = index;
}
});
console.log("smallest", smallestIndex);
return smallestIndex;
}
var index;
while ((index = getSmallestIndex()) >= 0) {
outPort.send(ips[index]);
ips[index] = inPorts[index].receive();
}
};
function roundrobin() {
var inPort = InputPort.openInputPort('IN');
var outPorts = OutputPortArray.openOutputPortArray('OUT');
var index = 0;
var ip;
while (ip = inPort.receive()) {
outPorts[index].send(ip);
index = (index + 1) % outPorts.length;
}
}
describe('sort', function() {
it('should buffer and sort a stream', function(){
var result = [];
var sender = fbp.defProc(MockSender([1,4,2,9,5,0,3,8,7,6]));
var sorter = fbp.defProc(sort);
var receiver = fbp.defProc(MockReceiver(result));
fbp.connect(sender, 'OUT', sorter, 'IN', 5);
fbp.connect(sorter, 'OUT', receiver, 'IN');
try {
fbp.run({ trace: false });
} catch (e) {
console.warn(e);
}
expect(result).to.deep.equal([0,1,2,3,4,5,6,7,8,9]);
});
});
describe('mergesorted', function() {
it('merge two sorted streams into a single sorted stream', function() {
var result = [];
var sender0 = fbp.defProc(MockSender([0,1,3,4,7,9]));
var sender1 = fbp.defProc(MockSender([2,5,6,8]));
var merge = fbp.defProc(mergesorted);
var receiver = fbp.defProc(MockReceiver(result));
fbp.connect(sender0, 'OUT', merge, 'IN[0]', 5);
fbp.connect(sender1, 'OUT', merge, 'IN[1]', 5);
fbp.connect(merge, 'OUT', receiver, 'IN');
try {
fbp.run({ trace: false });
} catch (e) {
console.warn(e);
}
expect(result).to.deep.equal([0,1,2,3,4,5,6,7,8,9]);
});
it('split a stream through two sort componets and merge the resulting streams', function() {
var result = [];
var sender = fbp.defProc(MockSender([1,4,2,9,5,0,3,8,7,6]));
var rndrbn = fbp.defProc(roundrobin);
var sorter0 = fbp.defProc(sort);
var sorter1 = fbp.defProc(sort);
var merge = fbp.defProc(mergesorted);
var receiver = fbp.defProc(MockReceiver(result));
fbp.connect(sender, 'OUT', rndrbn, 'IN', 5);
fbp.connect(rndrbn, 'OUT[0]', sorter0, 'IN', 5);
fbp.connect(rndrbn, 'OUT[1]', sorter1, 'IN', 5);
fbp.connect(sorter0, 'OUT', merge, 'IN[0]');
fbp.connect(sorter1, 'OUT', merge, 'IN[1]');
fbp.connect(merge, 'OUT', receiver, 'IN');
try {
fbp.run({ trace: false });
} catch (e) {
console.warn(e);
}
expect(result).to.deep.equal([0,1,2,3,4,5,6,7,8,9]);
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment