Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rhalff/5778689 to your computer and use it in GitHub Desktop.
Save rhalff/5778689 to your computer and use it in GitHub Desktop.
// benchmark/object-stream-throughput.js
//
// Shows performance of pipe()ing new streams in object mode
// through Transform streams.
var OBJECTS_PER_B = 12 / 1024; // I get 12 of my objects per KB of input XML
var SOURCE_WRITE_SIZE = 64*1024;
var common = require('./common');
var net = require('net');
var stream = require('stream');
var util = require('util');
var modeMap = {
'dataStreamsOnly': {
createSource: createNetworkBufferSource,
createTransform: NullTransform,
createSink: Sink,
},
'dataToObject': {
createSource: createNetworkParsedObjectSource,
createTransform: function() { return new NullTransform({objectMode: true}) },
createSink: function() { return new Sink({objectMode: true}) },
},
};
var bench = common.createBenchmark(test, {
dur: [5],
steps: [0, 1, 2, 4, 8],
mode: Object.keys(modeMap),
});
function test(conf) {
var dur = +conf.dur;
var steps = +conf.steps;
var modeConfig = modeMap[conf.mode];
bench.start();
testObjectStreamThroughput(dur, steps, modeConfig, function(err, count) {
if (err) { throw err; }
bench.end(count);
});
}
// Readable Stream, cranking out endless buffers of the requested size.
function BufferSource(blockSizeInBytes) {
if (!(this instanceof BufferSource)) {
return new BufferSource(blockSizeInBytes);
}
stream.Readable.call(this);
this.blockSizeInBytes = blockSizeInBytes;
}
util.inherits(BufferSource, stream.Readable);
BufferSource.prototype._read = function() {
this.push(new Buffer(this.blockSizeInBytes));
};
// Create a BufferSource on the other side of a network socket connection.
// This keeps the event loop cranking without having to call setImmediate
// manually from our readers or writers. It's also a better simulation of
// my situation: processing dozens to thousands of streams handling
// gigabytes of data, while avoiding running out of memory if I can read
// faster from my sources than I can write to my clients.
function createNetworkBufferSource(callback) {
var port = 8125;
var server = net.createServer(function connected(c) {
new BufferSource(SOURCE_WRITE_SIZE).pipe(c);
});
server.listen(port);
var client = net.connect({ port: port }, function connected() {
callback(null, client);
});
}
// Create a BufferSource on the other side of a network socket connection,
// AND transform it to objects using NetworkBufferToObjectTransform.
function createNetworkParsedObjectSource(callback) {
createNetworkBufferSource(function(err, stream) {
var parser = new NetworkBufferToObjectTransform(OBJECTS_PER_B);
callback(null, stream.pipe(parser));
});
}
// Transform Stream, transforming from network buffers to objects.
// Simulates some kind of parser.
function NetworkBufferToObjectTransform(objectsPerByte) {
if (!(this instanceof NetworkBufferToObjectTransform)) {
return new NetworkBufferToObjectTransform(objectsPerByte);
}
stream.Transform.call(this, {objectMode: true});
this.objectsPerByte = objectsPerByte;
this.idx = 0;
}
util.inherits(NetworkBufferToObjectTransform, stream.Transform);
NetworkBufferToObjectTransform.prototype._transform = function(data, encoding, callback) {
var objectCount = (this.objectsPerByte * data.length)|0;
for (var i=0; i<objectCount; i++) {
this.push({ idx: this.idx++ });
}
callback(null);
};
// Writable Stream, counting its writes.
function Sink(options) {
if (!(this instanceof Sink)) {
return new Sink(options);
}
stream.Writable.call(this, options);
this.count = 0;
if (options && options.objectMode) {
this._getCount = function(chunk) {
return 1;
}
} else {
this._getCount = function(chunk) {
return chunk.length;
}
}
}
util.inherits(Sink, stream.Writable);
Sink.prototype._write = function(chunk, encoding, callback) {
this.count = this.count + this._getCount(chunk);
callback(null);
};
// Transform Stream, doing nothing.
function NullTransform(options) {
if (!(this instanceof NullTransform)) {
return new NullTransform(options);
}
stream.Transform.call(this, options);
}
util.inherits(NullTransform, stream.Transform);
NullTransform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback(null);
};
function testObjectStreamThroughput(seconds, steps, modeConfig, callback) {
modeConfig.createSource(function(err, source) {
var tail = source;
for (var i=0; i<steps; i++) {
var intermediary = modeConfig.createTransform();
tail.pipe(intermediary);
tail = intermediary;
}
// Pipe the last stream into the ObjectSink.
var sink = modeConfig.createSink();
setTimeout(function() { callback(null, sink.count); }, 1000 * seconds);
tail.pipe(sink);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment