Skip to content

Instantly share code, notes, and snippets.

@phated
Last active October 5, 2016 20:49
Show Gist options
  • Save phated/2b65641136ea54be95033b53be76ebce to your computer and use it in GitHub Desktop.
Save phated/2b65641136ea54be95033b53be76ebce to your computer and use it in GitHub Desktop.

Trying to solve this problem but I think I lack some knowledge of pull-streams (still a newbie).

I have a single source and a single sink (I don't actually control these), my goal is to feed the source into multiple through streams and then combine the results of each of them and pass into the sink. The order doesn't matter.

       / through A \
source - through B - sink
       \ through C /
pull(
  pull.values([1, 2, 3]),
  forkAndMerge([
    pull.map(function(val) {
      return val * 2;
    }),
    pull.map(function(val) {
      return val * 3;
    }),
    pull.map(function(val) {
      return val * 4;
    })
  ]),
  // I'd be fine with pull.flatten() here if that's the only way to make it work
  pull.collect(function(err, values) {
    // values should contain 2, 4, 6, 3, 6, 9, 4, 8, 12 in any order
  })
)
@phated
Copy link
Author

phated commented Oct 5, 2016

Here is my current solution that seems to be working:

'use strict';

var pull = require('pull-stream');
var Notify = require('pull-notify');
var through = require('pull-through');

function fork(throughs) {
  var notify = Notify();

  if (!Array.isArray(throughs)) {
    throughs = [throughs];
  }

  throughs.forEach(function(stream) {
    var queue;

    pull(
      notify.listen(),
      pull.map(function(event) {
        queue = event.queue;
        return event.data;
      }),
      stream,
      pull.drain(function(data) {
        if (queue) {
          queue(data);
        }
      }, function() {
        queue = null;
      })
    );
  });

  return through(function(data) {
    notify({ data: data, queue: this.queue });
  }, function() {
    notify.end();
    this.queue(null);
  });
}

module.exports = fork;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment