Skip to content

Instantly share code, notes, and snippets.

@mojodna
Last active June 19, 2018 04:17
Show Gist options
  • Save mojodna/8175805 to your computer and use it in GitHub Desktop.
Save mojodna/8175805 to your computer and use it in GitHub Desktop.
Some random Node.js stream classes.
"use strict";
var stream = require("stream"),
util = require("util");
var buffertools = require("buffertools");
var BinarySplitter = function(delimiter) {
stream.Transform.call(this);
this.delimiter = delimiter || "\n";
var pending = new Buffer(0);
this._transform = function(chunk, encoding, callback) {
var buffer = Buffer.concat([pending, chunk]),
offset = 0;
while (offset < buffer.length) {
var idx = buffertools.indexOf(buffer, this.delimiter, offset);
if (idx < 0) {
break;
}
this.push(buffer.slice(offset, idx + 1));
offset = idx + 1;
}
pending = buffer.slice(offset);
return setImmediate(callback);
};
this._flush = function(callback) {
if (pending.length > 0) {
this.push(pending);
}
return setImmediate(callback);
};
};
util.inherits(BinarySplitter, stream.Transform);
module.exports = BinarySplitter;
"use strict";
var stream = require("stream"),
util = require("util");
var Chunker = function(size) {
stream.Transform.call(this);
this.chunkSize = size || 512;
this._transform = function(chunk, encoding, callback) {
var offset = 0,
end;
while (offset < chunk.length) {
end = Math.min(offset + this.chunkSize, chunk.length);
this.push(chunk.slice(offset, end));
offset += this.chunkSize;
}
return setImmediate(callback);
};
};
util.inherits(Chunker, stream.Transform);
module.exports = Chunker;
"use strict";
var stream = require("stream"),
util = require("util");
var Generator = function() {
stream.Readable.call(this);
this.index = 0;
this.numbers = [4, 8, 15, 16, 23, 42];
this._read = function(size) {
this.push(new Buffer(this.numbers[this.index++ % this.numbers.length].toString() + "\n"));
};
};
util.inherits(Generator, stream.Readable);
module.exports = Generator;
"use strict";
var stream = require("stream"),
util = require("util");
// TODO write this up as "how to merge readable streams"
var InfinitePassThrough = function() {
stream.Transform.call(this);
this._transform = function(chunk, encoding, callback) {
// only pass data on if something's listening and we're flowing
if (this._readableState.pipesCount > 0 &&
this._readableState.buffer.length === 0) {
this.push(chunk);
}
return callback();
};
setInterval(function() {
console.log("%d clients.", this._readableState.pipesCount);
}.bind(this), 5000).unref();
// overwrite Transform's end() function with a mangled version that doesn't
// actually end.
this.end = function(chunk, encoding, cb) {
if (typeof chunk === 'function') {
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
cb = cb || function() {};
if (typeof chunk !== 'undefined' && chunk !== null) {
this.write(chunk, encoding);
}
return cb();
};
};
util.inherits(InfinitePassThrough, stream.Transform);
module.exports = InfinitePassThrough;
"use strict";
var stream = require("stream"),
util = require("util");
var Sampler = function(rate) {
stream.Transform.call(this);
this._transform = function(chunk, encoding, callback) {
if (Math.random() * 100 <= rate) {
this.push(chunk);
}
return callback();
};
};
util.inherits(Sampler, stream.Transform);
module.exports = Sampler;
"use strict";
var stream = require("stream"),
util = require("util");
var SlowReceiver = function(delay) {
stream.Transform.call(this);
this.delay = delay || 1000;
this._transform = function(chunk, encoding, callback) {
return setTimeout(function() {
this.push(chunk);
return callback();
}.bind(this), this.delay);
};
};
util.inherits(SlowReceiver, stream.Transform);
module.exports = SlowReceiver;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment