Skip to content

Instantly share code, notes, and snippets.

@creationix
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creationix/eb9a9750781fdfa396fd to your computer and use it in GitHub Desktop.
Save creationix/eb9a9750781fdfa396fd to your computer and use it in GitHub Desktop.
Transform Interface

In many of my libraries, I like to program stream transforms using this simple format. It can then be easily adapted to virtually any stream interface.

function transform(emit) {
  // Given an emit function as input, return a new emit function.
  // Per-stream state can be initialized here and it closed over by the returned fn.
  return function (item) {
    // When the new emit function is called, process the input and call
    // the old emit zero or more times.
  };
}

A trivial example is a transform that takes arbitrary binary chunks and emits strings terminated by newlines.

function deline(emit) {
  var chunks = [];
  return function (chunk) {
    var start = 0;
    for (var i = 0, l = chunk.length; i < l; i++) {
      if (chunk[i] === 0x0a) {
        var end = i + 1;
        chunks.push(chunk.slice(start, end));
        emit(Buffer.concat(chunks).toString());
        chunks.length = 0;
        start = end;
      }
    }
    chunks.push(chunk.slice(start));
  }
}

The input must be handled before returning. This means you must call emit before returning. It is illegal to start some non-blocking operation and then call emit on a later tick. If you don't follow this rule, then this can't used to adapt to streams that implement back-pressure.

Also if there are errors, you can throw since the entire transform is sync.

@creationix
Copy link
Author

Suppose you have a simple pull-style stream with the following interface:

// The stream is the read function, very minimal example.
function read(callback) {
  // Do something and call the callback when data is ready
  // This is a "zalgo" node callback meaning it has (err, value)
  // And can be called before read returns or sometime after.
}
// When then stream is done, callback will be called with undefined as the value.

This is easy to create and pretty easy to consume when using tools like gen-run with ES6 generators:

run(function* () {
  var read = api.to.createStream(some, args);
  do {
    var item = yield read;
    console.log(item);
  } while (item !== undefined);
});

Suppose you wanted to transform a stream of raw TCP chunks and wanted instead strings by newline. With this generic adapter for my read stream interface to apply transforms, you could just add one line above.

// In the above code, before the loop, add:
read = applyTransform(read, deline);

// Here is the generic applyTransform helper.
// Given a read stream and a transform function, return a new read stream.
function applyTransform(read, transform) {
  var queue = [];
  var done = false;
  var process = transform(emit);

  return function (callback) {
    if (done) return callback();
    return check();
    function check() {
      if (queue.length) return callback(null, queue.shift());
      read(onRead);
    }
    function onRead(err, chunk) {
      if (err) return callback(err);
      if (chunk === undefined) {
        done = true;
        return callback();
      }
      try { process(chunk); }
      catch (err) { return callback(err); }
      check();
    }
  };
}

@creationix
Copy link
Author

Also, suppose you wanted to use the transform on a static array. That's even easier.

var chunks = [...]; // Large array of captured TCP chunks
var lines = applyTransform(chunks, deline);

function applyTransform(array, transform) {
  var out = [];
  // Since forEach is push style, we just inject our transform
  // between forEach and it's callback.
  array.forEach(transform(function (item) {
    out.push(item);
  }));
  return out;
}

@creationix
Copy link
Author

This would also adapt to CSP channels, to node streams, to RX streams, etc. It can preserve back-pressure in the adapter for all streams that implement that. This simple transform interface can be used for any CPU bound and side-effect free tranducer. I wish more protocols like HTTP and websocket were implemented as simple transforms like this instead of being tightly coupled with some concrete stream and socket APIs.

(For larger examples, see my transforms for http and websocket

@Gozala
Copy link

Gozala commented Sep 22, 2014

@creatronix I think there are few choices that I personally prefer ones made by transducers:

  1. Calling emit async is illegal is statement, is a weak enforcement of the interface for my taste. I prefer that transducers interface since it's based on return rather than invoke so doing it async is just not an option.
  2. Transducers have support early termination, which is handy for implementing combinators like takeWhile for example. From what I can see transforms interface does not has support for aborting
    a producer.

@creationix
Copy link
Author

Yep, you're correct on both counts. What I like about my interface is it's dead simple. I don't have to worry about having some abstract interface for producing new array-like things or @@iterator or anything like that. I just create a function that takes a simple function and returns a new function. The reduce part is implicitly enabled by local per-stream state in the closure. The task of interfacing with the various APIs is pushed outside the code doing the actual transform where it's own code can become quite complex.

@creationix
Copy link
Author

In other words, my interface is both simpler and more powerful. So powerful in fact, that I have to add artificial constraints to make it play nice with the widest range of applications. (though it would still function without the constraint, it would just break back-pressure) It's up to you to judge when this would be appropriate, but for me I usually prefer this style.

@grncdr
Copy link

grncdr commented Sep 22, 2014

It seems impossible to enforce the "emit must be called sync" constraint. This is because you must call transform(emit) only once per unique "upstream" emit (to initialize any state that transform may have) and thus you can't wrap emit so that it closes over a per-input value.

Supporting early termination means that transducers have a similar constraint: they must not call the reducing function again with input after they have seen a reduced? value. as stated here. In general Rich Hickey seems quite ok with these kinds of "verbal contracts" in the interface of transducers.

It's an interesting question whether one could maintain the complete genericity of transducers or simple-streams without relying on such unenforceable constraints.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment