Skip to content

Instantly share code, notes, and snippets.

@Gozala
Last active August 5, 2016 14:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Gozala/5314269 to your computer and use it in GitHub Desktop.
Save Gozala/5314269 to your computer and use it in GitHub Desktop.
Signals

Signal

Signal represents collection of values (over time) associated with a single identity. It can be expressed as follows:

function signal(next) {
  next(1)
  next(2)
  next(3)
  // ...
}

End of signal

Signals in haskell and Elm have no notion of end that implies lot's of technical constraints at the implementation level. Also while this may work in pure languages it's very unnatural for language like JS and highly mutable environment it's running in. There for this definition of signal has notion of end. Finite signal can be expressed as follows:

function signal(next, end) {
  next(1)
  next(2)
  next(3)
  end()
}

Valid signal behavior

Since signals may be used to represent IO it's quite possible to have race conditions. If signal ends with an argument (different from null and undefined) it's treated as an error:

function signal(next, end) {
  next(1)
  end(Error("oops!"))
}

Inovking next or end after end was invoked is forbidden. Signals that do so are considered broken, although runtime enforcement of this is out of the scope of this definition. Preferably such enforcments can and will be done as a second tire utilities:

function normalize(signal, strict) {
  return function normalized(next, end) {
    var ended = false
    signal(function forwardNext(value) {
      if (ended) {
        if (strict) throw Error("Can not yield values from ended signal")
        else console.warn("Can not yield values from ended signal", signal, value)
      } else {
        return next(value)
      }
    }, function forwardEnd(error) {
      if (ended) {
        if (strict) throw Error("Can not end signal more than once")
        else console.warn("Can not end signal more than once", signal, value)
      }
    })
  }
}

Push & Pull

There is no single right choice when it comes to choosing between push & pull style streaming. They both have pros & cons and depending on problem scope diffirent choice makes it better fit. Push style streams can be a lot more efficent since they don't require chopping at each transformation. On the other hand reading just a part of stream, pausing & resuming than again is a lot harder with push style streams, specilly in pure functional style.

This definition of signal attempts to take hybrid approach. It favors push style for the efficency, but lets consumer downgrade to pull style. Unlike linked lists of head, tail pairs, there are no predifined chunks. Signal starts pushing values, but consumer can signal it to return rest in form of other signal:

function range(from, to) {
  return signal(next, end) {
    var value = from
    while (value < to) {
      var continuation = next(value)
      value = value + 1
      // If consumer returns continuation function,
      // rest of the signal should be passed to it
      // and rest should be cleaned up.
      if (typeof(continuation) === "function")
        return continuation(range(value, to))
    }
    end()
  }
}

In a way this is similar to pull streams with a difference that size of chunks is dictated by consumer instead of making chunk size of value.

Transformations

Transformation functions need to be avare of pausing in order to handle it properly:

function map(f, signal) {
  return function mapped(next, end) {
    function rest(continuation) {
      return function(signal) {
        continuation(map(f, signal))
      }
    }
    signal(function(value) {
      var continuation = next(value)
      return typeof(continuation) ? rest(continuation) :
             continuation
    }, end)
  }
}

Although logic about pausing can be easily factored out:

function pausable(transform) {
  return function() {
    var params = Array.prototype.slice(arguments)
    var signal = params.shift()
    
    // Internal utility function takes `continuation` function
    // and returns a forwarder function that given a rest of
    // the signal, passes transformed version (transformation
    // described by an out function) back to curried continuation.
    // This is basically used to apply same transformation to
    // the rest of the stream before passing it along the pipeline.
    function forward(continuation, rest) {
      function f(rest) {
        var transformedRest = transform.apply(transform, [params].concat(rest))
        continuation(transformedRest)
      }
      return rest ? f(rest) : f
    }
    
    // Forward a given signal to an inlined continuation
    // that returns transformed version of it.
    return forward(function(signal) {
      return function(next, end) {
        signal(function(value) {
          // If reader wishes to get rest of the signal
          // to read it later, wrap continuation to
          // forward transformed version of rest.
          var continuation = next(value)
          return typeof(continuation) === "function" ? forward(continuation) :
                 continuation
        }, end)
      }
    }, signal)
  }
}

var map = pausable(function(f, signal) {
  return function mapped(next, end) {
    signal(function(value) {
      return next(f(value))
    }, end)
  }
})

var filter = pausable(function(p, signal) {
  return function filtered(next, end) {
    signal(function(value) {
      return p(value) ? next(value) : value
    }, end)
  }
})

Stoping signals

Sometimes it's necessary to stop signal processing, which is a special case of pausing it. Unlike regular pausing though stop carries additional intent to free up resources. For example if signal represents operation that opens file descriptor, yelds read conent of file and closes file descriptor. You'd expect that takeWhile(isntEmptyLine, fs.read(path)) would read file content until an empty line and then close the file descriptor. Simply pausing won't close file descriptor. If such cases are relevant to a signal (they're not for ranges in the example above for example) they should recognize special continuation.stop attribute. If it's true signal should release all the associated resources and end the the signal.

function stop() {}
stop.stop = true

var takeWhile = pausable(function(p, signal) {
  return function(next, end) {
    signal(function(value) {
      return p(value) ? next(value) : stop
    }, end)
  }
})

More coordination

Special readers can also extend protocol to encorporate additional attributes, which specific signals could recognize and optimize behavior accordingly. For example special flags can be used to indicate that reader want's to pause http request, but instead of actually pausing socket it would rather let it buffer. If underlaying signal recognizes that it will optimize behavior, if not it will degradate to a default pausing mechanism.

@Gozala
Copy link
Author

Gozala commented Apr 4, 2013

Open questions

  1. How do we indicate interrupt ? Tempted to say that pause and I'll never
    read tail is that, but how do we know when to close file descriptors ?
  2. If your represent file content as a signal, you'll wish to express chain
    of open -> read, read, read -> close. Also ideally when you read part of the
    file you'll do open -> read-part -> close. Likely interrupt should cause
    close, so pause and never read end is not a good option.

@Raynos
Copy link

Raynos commented Apr 4, 2013

To generalize map and rest a bit more:

// map:: (A -> B) -> Signal<A> -> Signal<B>
function map(lambda) { return function duplex(source) {
    return function signal(next, end) {
        signal(function(value) {
            var sink = next(value)
            return sink ? rest(duplex, sink) : null
        }, end)
    }
} }

// rest :: (Signal<A> -> Signal<B>) -> Sink<B> -> Sink<A>
function rest(duplex, destination) {
    return function sink(signal) { 
        destination(duplex(signal))
    }
}

The above is a correct implementation of map. However it's weird because rest takes the sinks in the wrong order which is messed up.

@Gozala
Copy link
Author

Gozala commented Apr 4, 2013

Use of functions for reader to input communication is limited since all the attributes will have to be manually copied, it would be better to use objects such that wrapping could just do

Object.create(original, {
  continuation: {
    value: function(rest) {
      original.continuation(transform(rest))
    }
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment