Your friends from pull stream, but in terms of async iterators.
A "source" is something that can be consumed. It is an iterable object.
const ints = {
[Symbol.asyncIterator] () {
let i = 0
return {
async next () {
return { done: false, value: i++ }
}
}
}
}
// or, more succinctly using a generator and for/await:
const ints = (async function * () {
let i = 0
while (true) yield i++
})()
In Node.js stream terms this is a "readable".
A "sink" is something that consumes (or drains) a source. It is a function that takes a source and iterates over it. It optionally returns a value.
const logger = async source => {
const it = source[Symbol.asyncIterator]()
while (true) {
const { done, value } = await it.next()
if (done) break
console.log(value) // prints 0, 1, 2, 3...
}
}
// or, more succinctly using a generator and for/await:
const logger = async source => {
for await (const chunk of source) {
console.log(chunk) // prints 0, 1, 2, 3...
}
}
In Node.js stream terms this is a "writable".
A "transform" is both a sink and a source where the values it consumes and the values that can be consumed from it are connected in some way. It is a function that takes a source and returns a source.
const doubler = source => {
return {
[Symbol.asyncIterator] () {
const it = source[Symbol.asyncIterator]()
return {
async next () {
const { done, value } = await it.next()
if (done) return { done }
return { done, value: value * 2 }
}
return () {
return it.return && it.return()
}
}
}
}
}
// or, more succinctly using a generator and for/await:
const doubler = source => (async function * () {
for await (const chunk of source) {
yield chunk * 2
}
})()
A "duplex" is similar to a transform but the values it consumes are not necessarily connected to the values that can be consumed from it. It is an object with two properties, sink
and source
.
const duplex = {
sink: async source => {/* ... */},
source: { [Symbol.asyncIterator] () {/* ... */} }
}
To thread together multiple streaming iterables, you can just call them:
logger(doubler(ints))
This can look a bit back-to-front so you could use a convenience function to "pipe" them together and make the code more readable:
const pipe = (...fns) => {
let res
while (fns.length)
res = fns.shift()(res)
return res
}
pipe(() => ints, doubler, logger)
// Note, we can await on pipe for the return value of logger:
// const result = await pipe(...)
https://github.com/alanshaw/it-pipe is a utility module that does this.
Some notes:
The
streaming-iterables
module is ace, we should totally use itThe
pipeline
function fromstreaming-iterables
does exactly what is mentioned herepipeline
doesn't work with duplex objects -pull
has magic that allows you to use them in a pipeline. Right now the workaround is:We can build a smarter pipeline that works with duplex and source (so the first item in the pipeline doesn't need a function wrapper).
EDIT: I did! https://github.com/alanshaw/it-pipe