In many of my libraries, I like to program stream transforms using this simple format. It can then be easily adapted to virtually any stream interface.
function transform(emit) {
// Given an emit function as input, return a new emit function.
// Per-stream state can be initialized here and it closed over by the returned fn.
return function (item) {
// When the new emit function is called, process the input and call
// the old emit zero or more times.
};
}
A trivial example is a transform that takes arbitrary binary chunks and emits strings terminated by newlines.
function deline(emit) {
var chunks = [];
return function (chunk) {
var start = 0;
for (var i = 0, l = chunk.length; i < l; i++) {
if (chunk[i] === 0x0a) {
var end = i + 1;
chunks.push(chunk.slice(start, end));
emit(Buffer.concat(chunks).toString());
chunks.length = 0;
start = end;
}
}
chunks.push(chunk.slice(start));
}
}
The input must be handled before returning. This means you must call emit
before returning.
It is illegal to start some non-blocking operation and then call emit on a later tick.
If you don't follow this rule, then this can't used to adapt to streams that implement back-pressure.
Also if there are errors, you can throw since the entire transform is sync.
Suppose you have a simple pull-style stream with the following interface:
This is easy to create and pretty easy to consume when using tools like
gen-run
with ES6 generators:Suppose you wanted to transform a stream of raw TCP chunks and wanted instead strings by newline. With this generic adapter for my read stream interface to apply transforms, you could just add one line above.