Signal represents collection of values (over time) associated with a single identity. It can be expressed as follows:
function signal(next) {
next(1)
next(2)
next(3)
// ...
}
Signals in haskell and Elm have no notion of end that implies lot's of technical constraints at the implementation level. Also while this may work in pure languages it's very unnatural for language like JS and highly mutable environment it's running in. There for this definition of signal has notion of end. Finite signal can be expressed as follows:
function signal(next, end) {
next(1)
next(2)
next(3)
end()
}
Since signals may be used to represent IO it's quite possible to have race
conditions. If signal ends with an argument (different from null
and
undefined
) it's treated as an error:
function signal(next, end) {
next(1)
end(Error("oops!"))
}
Inovking next
or end
after end
was invoked is forbidden. Signals
that do so are considered broken, although runtime enforcement of this
is out of the scope of this definition. Preferably such enforcments can
and will be done as a second tire utilities:
function normalize(signal, strict) {
return function normalized(next, end) {
var ended = false
signal(function forwardNext(value) {
if (ended) {
if (strict) throw Error("Can not yield values from ended signal")
else console.warn("Can not yield values from ended signal", signal, value)
} else {
return next(value)
}
}, function forwardEnd(error) {
if (ended) {
if (strict) throw Error("Can not end signal more than once")
else console.warn("Can not end signal more than once", signal, value)
}
})
}
}
There is no single right choice when it comes to choosing between push & pull style streaming. They both have pros & cons and depending on problem scope diffirent choice makes it better fit. Push style streams can be a lot more efficent since they don't require chopping at each transformation. On the other hand reading just a part of stream, pausing & resuming than again is a lot harder with push style streams, specilly in pure functional style.
This definition of signal attempts to take hybrid approach. It favors push
style for the efficency, but lets consumer downgrade to pull style. Unlike
linked lists of head, tail
pairs, there are no predifined chunks. Signal
starts pushing values, but consumer can signal it to return rest in form
of other signal:
function range(from, to) {
return signal(next, end) {
var value = from
while (value < to) {
var continuation = next(value)
value = value + 1
// If consumer returns continuation function,
// rest of the signal should be passed to it
// and rest should be cleaned up.
if (typeof(continuation) === "function")
return continuation(range(value, to))
}
end()
}
}
In a way this is similar to pull streams with a difference that size of chunks is dictated by consumer instead of making chunk size of value.
Transformation functions need to be avare of pausing in order to handle it properly:
function map(f, signal) {
return function mapped(next, end) {
function rest(continuation) {
return function(signal) {
continuation(map(f, signal))
}
}
signal(function(value) {
var continuation = next(value)
return typeof(continuation) ? rest(continuation) :
continuation
}, end)
}
}
Although logic about pausing can be easily factored out:
function pausable(transform) {
return function() {
var params = Array.prototype.slice(arguments)
var signal = params.shift()
// Internal utility function takes `continuation` function
// and returns a forwarder function that given a rest of
// the signal, passes transformed version (transformation
// described by an out function) back to curried continuation.
// This is basically used to apply same transformation to
// the rest of the stream before passing it along the pipeline.
function forward(continuation, rest) {
function f(rest) {
var transformedRest = transform.apply(transform, [params].concat(rest))
continuation(transformedRest)
}
return rest ? f(rest) : f
}
// Forward a given signal to an inlined continuation
// that returns transformed version of it.
return forward(function(signal) {
return function(next, end) {
signal(function(value) {
// If reader wishes to get rest of the signal
// to read it later, wrap continuation to
// forward transformed version of rest.
var continuation = next(value)
return typeof(continuation) === "function" ? forward(continuation) :
continuation
}, end)
}
}, signal)
}
}
var map = pausable(function(f, signal) {
return function mapped(next, end) {
signal(function(value) {
return next(f(value))
}, end)
}
})
var filter = pausable(function(p, signal) {
return function filtered(next, end) {
signal(function(value) {
return p(value) ? next(value) : value
}, end)
}
})
Sometimes it's necessary to stop signal processing, which is a special
case of pausing it. Unlike regular pausing though stop carries additional
intent to free up resources. For example if signal represents operation
that opens file descriptor, yelds read conent of file and closes
file descriptor. You'd expect that takeWhile(isntEmptyLine, fs.read(path))
would read file content until an empty line and then close the file
descriptor. Simply pausing won't close file descriptor. If such cases are
relevant to a signal (they're not for ranges in the example above for
example) they should recognize special continuation.stop
attribute. If
it's true
signal should release all the associated resources and end the
the signal.
function stop() {}
stop.stop = true
var takeWhile = pausable(function(p, signal) {
return function(next, end) {
signal(function(value) {
return p(value) ? next(value) : stop
}, end)
}
})
Special readers can also extend protocol to encorporate additional attributes, which specific signals could recognize and optimize behavior accordingly. For example special flags can be used to indicate that reader want's to pause http request, but instead of actually pausing socket it would rather let it buffer. If underlaying signal recognizes that it will optimize behavior, if not it will degradate to a default pausing mechanism.
Open questions
read tail is that, but how do we know when to close file descriptors ?
of
open -> read, read, read -> close
. Also ideally when you read part of thefile you'll do
open -> read-part -> close
. Likely interrupt should causeclose, so pause and never read end is not a good option.