Skip to content

Instantly share code, notes, and snippets.

Last active September 22, 2023 23:49
Show Gist options
  • Save alanshaw/591dc7dd54e4f99338a347ef568d6ee9 to your computer and use it in GitHub Desktop.
Save alanshaw/591dc7dd54e4f99338a347ef568d6ee9 to your computer and use it in GitHub Desktop.
Streaming iterables WAT?

Streaming iterables

Your friends from pull stream, but in terms of async iterators.

source it

A "source" is something that can be consumed. It is an iterable object.

const ints = {
  [Symbol.asyncIterator] () {
    let i = 0
    return {
      async next () {
        return { done: false, value: i++ }

// or, more succinctly using a generator and for/await:

const ints = (async function * () {
  let i = 0
  while (true) yield i++

In Node.js stream terms this is a "readable".

sink it

A "sink" is something that consumes (or drains) a source. It is a function that takes a source and iterates over it. It optionally returns a value.

const logger = async source => {
  const it = source[Symbol.asyncIterator]()
  while (true) {
    const { done, value } = await
    if (done) break
    console.log(value) // prints 0, 1, 2, 3...

// or, more succinctly using a generator and for/await:

const logger = async source => {
  for await (const chunk of source) {
    console.log(chunk) // prints 0, 1, 2, 3...

In Node.js stream terms this is a "writable".

transform it

A "transform" is both a sink and a source where the values it consumes and the values that can be consumed from it are connected in some way. It is a function that takes a source and returns a source.

const doubler = source => {
  return {
    [Symbol.asyncIterator] () {
      const it = source[Symbol.asyncIterator]()
      return {
        async next () {
          const { done, value } = await
          if (done) return { done }
          return { done, value: value * 2 }
        return () {
          return it.return && it.return()

// or, more succinctly using a generator and for/await:

const doubler = source => (async function * () {
  for await (const chunk of source) {
    yield chunk * 2

duplex it

A "duplex" is similar to a transform but the values it consumes are not necessarily connected to the values that can be consumed from it. It is an object with two properties, sink and source.

const duplex = {
  sink: async source => {/* ... */},
  source: { [Symbol.asyncIterator] () {/* ... */} }


To thread together multiple streaming iterables, you can just call them:


This can look a bit back-to-front so you could use a convenience function to "pipe" them together and make the code more readable:

const pipe = (...fns) => {
  let res
  while (fns.length)
    res = fns.shift()(res)
  return res

pipe(() => ints, doubler, logger)
// Note, we can await on pipe for the return value of logger:
// const result = await pipe(...) is a utility module that does this.

Copy link

@bmordan yes 😁

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment