Skip to content

Instantly share code, notes, and snippets.

@Raynos
Forked from Gozala/streamer2.js
Last active August 18, 2022 22:24
Show Gist options
  • Save Raynos/5291762 to your computer and use it in GitHub Desktop.
Save Raynos/5291762 to your computer and use it in GitHub Desktop.
// A pull stream is a function readable(closing, cb<err, chunk>)
// To extract a chunk out of the stream simply call (null, cb)
// to get another value call it again.
// The stream will tell you if it's ended by invoking cb(true)
// The stream will tell you if it errored by invoking cb(Error)
// You can tell the stream to close / destroy by calling stream(true, cb)
var stream = (function () {
var list = [1, 2, 3, 4]
return function stream(closing, callback) {
if (closing) {
callback(true)
} else {
var value = list.shift()
if (value) {
callback(null, value)
} else {
callback(true)
}
}
}
}())
var nil = {}
function print(stream) {
stream(null, function log(error, value) {
if (error === true) {
console.log("END")
} else if (error) {
console.error("eep! ", err)
} else {
console.log("> ", value)
stream(null, log)
}
})
}
function map(lambda, source) {
return function stream(closing, callback) {
source(closing, function (err, value) {
callback(err, err ? null : lambda(value))
})
}
}
function filter(predicate, source) {
return function stream(closing, callback) {
source(closing, function filtered(err, value) {
if (err) return callback(err)
predicate(value) ? callback(null, value) : source(null, filtered)
})
}
}
function reductions(lambda, initial, source) {
var state = initial
return function stream(closing, callback) {
source(closing, function (err, value) {
if (err) return callback(err)
state = lambda(state, value)
callback(null, state)
})
}
}
function last(source) {
var result = nil
var ended = false
return function stream(closing, callback) {
if (ended) return callback(true)
source(closing, function (err, value) {
if (err === true) {
ended = true
result !== nil ? callback(null, result) : callback(true)
} else if (err) {
callback(err)
} else {
result = value
}
})
}
}
function reduce(lambda, initial, source) {
return last(reductions(lambda, initial, source))
}
function empty(closing, cb) { cb(true) }
function take(n, source) {
return n <= 0 ? empty : function stream(closing, callback) {
if (n === 0) return source(true, callback)
source(closing, function (err, value) {
if (err) return callback(err)
n--
callback(null, value)
})
}
}
function takeWhile(predicate, source) {
return function stream(closing, callback) {
source(closing, function (err, value) {
if (err) return callback(err)
predicate(value) ? callback(null, value) : source(true, callback)
})
}
}
function drop(n, source) {
return n <= source ? function stream(closing, callback) {
if (n === 0) return source(closing, callback)
source(closing, function forward(err, value) {
if (err) return callback(err)
n--
source(null, forward)
})
}
}
function dropWhile(predicate, source) {
var forwarding = false
return function stream(closing, callback) {
if (forwarding) return source(closing, callback)
source(closing, function (err, value) {
if (err) return callback(err)
var drop = predicate(value)
if (!drop) {
forwarding = true
callback(null, value)
}
})
}
}
function isError(err) {
return err instanceof Error
}
/* Merge :: [Stream] -> Stream
merge is messy because it has to shutdown underlying streams cleanly per
protocol.
Ignoring error handling, all it does is once it's read it either flushes
a single item out of it's buffer or it makes a read request to all
underlying streams in parallel and returns the first result. This means
it will pull on all streams concurrently and give you a message once
the first one returns. So you can pull at the rate of the fastest
stream but still equally get messages from other streams.
*/
function merge(sources) {
var buffer = []
var closed = []
var ended = false
function shutdown(closing, callback) {
// if you close this stream it should close all the underlying
// streams. If any error propagate the first error. If they all
// close cleanly then return end.
var count = sources.length
return sources.forEach(function (source, index) {
// Make sure to check whether this source is already closed
// If a source has errored, consider it closed
if (closed[index]) return
source(closing || true, function (err) {
closed[index] = true
if (err !== true && !ended) {
ended = true
callback(err)
} else if (!ended) {
--count
if (count === 0) {
ended = true
callback(true)
}
}
})
})
}
return sources.length === 0 ? empty : function stream(closing, callback) {
// only shutdown is closing === true
if (closing === true) {
return shutdown(closing, callback)
// forward all other `closing` messages to the first non-closed stream
} else if (closing) {
for (var i = 0; i < sources.length; i++) {
if (!closed[i]) {
return sources[i](closing, callback)
}
}
}
if (buffer.length) {
return callback(null, buffer.shift())
}
var send = false
sources.forEach(function (source, index) {
if (closed[index]) return
source(null, function (err, value) {
if (ended) return
// Only shutdown if we have END or ERROR
if (err === true || isError(err)) {
ended = true
// mark this stream as closed so that we don't try to
// close an already errored stream
closed[index] = true
callback(err)
// one stream errored, so close all the other ones
return shutdown()
// forward all other `err` channel messages
} else if (err) {
callback(err)
// if we have not send a chunk forward for this callback then
// just send it
} else if (!send) {
send = true
callback(null, value)
// otherwise buffer this chunk
} else {
buffer.push(value)
}
})
})
}
}

Pull stream

A pull stream is a representation of an asynchronous source that is nonending and contains either 1 or more chunks of data, which are not all available at the same time.

An infinite source of data with no natural way to end is not a pull stream. A source of data that always returns a single chunk is not a pull stream.

Interacting with a pull stream

To interact with a pull stream you use a reader. A reader is in charge of asking the pull stream for a chunk and then doing something with that data.

Pull stream interface

A pull stream is a function (closing, callback) {}. It is understood this function is not pure and has access to an asynchronous source in a closure scope.

When called with a falsey closing value it should not treat that value as any kind of closing message and ignore it. If closing is falsey then it should have been called with a function callback(err, value). It is now responsible for either finding a chunk and calling callback(null, value) in the future or determining that the stream contains no more data and calling callback(true) which is the mechanism for which to signal the reader that it has ended. Or the stream may either transition into an error state and call callback(err)

A pull stream must adhere to sending one or more chunks followed by either END or ERROR. It must not send both END and ERROR. It must not send multiple ERRORs. It must not somehow recover from an ERROR and send more chunks.

If a reader calls the pull stream multiple times before the pull stream has called the readers callback it is allowed to send the chunks to any callback it wants. It must not call two callbacks with the same chunk. When the pull stream ends or errors it should call one of the callbacks with the error message / end message. It must not send error/end to all callbacks. Once the stream has either ended or errored it must not invoke the other callbacks.

If closing is the value true then the stream should attempt to close the underlying asynchronous source and then it must call the callback with either end meaning that the source has succesfully closed and no more data will flow through it or ERROR meaning that there was an error closing the source.

Once a pull stream has received closing it must not call any of it's outstanding callbacks with a value.

A pull stream is not allowed to do any kind of asynchronous error inducing action unless it has an outstanding callback it can call with an error. This means that when a pull stream is created it must not open its underlying source until it's called at least once. This will avoid the unhandled error situation.

Reader interface

A reader may call the stream either one chunk at a time, or many in parallel. When a reader receives the END or ERROR message it is not allowed to call the pull stream again.

If a reader invokes a stream with closing === true then it is not allowed to call that stream again, ever.

Duplex streams

To operate on streams from a higher order point of view you should use duplex functions. A duplex takes a pull stream and returns a new pull stream. The new pull stream will do some kind of modification to the either values in the stream or the reading flow of the pull stream.

Backchannel amendum

Any duplex stream. i.e. a function PullStream -> PullStream should pass forward the err message. If the err message is not true or an Error it should not interpret it as some kind of special meaning and just pass it on directly to the reader.

Any duplex stream should pass on the closing value backwards to the source. If it's not the value true it should not interpret it as having any kind of special meaning. Doing this is actually hard in the case of merge as we can't call the callback multiple times per closing message.

merge

When you have a merged stream of multiple inputs. It becomes very hard to know how to handle a custom closing message as the reader is expecting to be talking to one stream not multiple. This means that merge needs to know how to split the closing message and talk to each source and then re-assemble a coherent response message to forward to the reader.

This is a non-trivial limitation of the callback get's called once model.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment