Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Iterables, AsyncIterables, Observables, Oh My!

I know there is a lot of confusion around Observables, Iterables, AsyncIterables and AsyncObservables. Let's try to break this down and the reasons for each.

Pull versus Push Collections

When it comes to collections, you have two ways of thinking about collections, push versus pull. With pull, the consumer is in control of when you get the items, but with push, you get the values when the producer is ready.

Pull Based Collections

With JavaScript, we have pull based collections built into the base libraries in the form of Iterable. In the case of the Iterator, which also covers Array, Set, Map and even Generator. In order to retrieve items from the collection, you have to explicitly pull an item from the iterator, or in the case of arrays and other collections, by index. So, in other words, you only get values when the consumer is ready to retrieve the values. You get the iterator directly by calling the [Symbol.iterator] method. In this following example, we'll pull items directly using the iterator.

const source = [1, 2, 3, 4, 5];
const it = source[Symbol.iterator]();

let next;
while (!(next = it.next()).done) {
  console.log(next.value);
}

Luckily, JavaScript has given us a nice way of iterating over collections using the for ... of syntax which calls the [Symbol.iterator] method for us, and even lets us deconstruct the item such as an array of key and value.

const source = new Map([
  [1, 'one'],
  [2, 'two'],
  [3, 'three']
]);

for (const [key, value] in source) {
  console.log(`Key: ${key}, Value: ${value}`);
}

There are library approaches to Iterables as well such as IxJS, which has the IterableX or iterable extension which allows us to call methods like map, filter, scan, etc on any iterable sequence.

import { of } from 'ix/iterable';
import { map } from 'ix/iterable/operators';

const source = of(1, 2, 3, 4).pipe(
  map(itm => item * item)
);

for (const item of source) {
  console.log(item);
}

The pull based collections are great for synchronous data, either finite or infinite, but no network calls or any I/O needs to be made.

Push Based Collections

On the other hand, you have push based collections, where you get values when the producer is ready. This is useful in the case of events or any values over time when it is unknown when there will be a value, hence the producer will let the consumer know when there are values. Most have seen this through the Subject/Observer pattern from the Gang of Four book, such as the case of the EventTarget API in the DOM or the EventEmitter class in Node.js.

const event = new Event('build');

// Listen for the event
const listener = e => { /* ... */ };

elem.addEventListener('build', listener, false);

// Dispatch the event
elem.dispatchEvent(event);

// Remove the event listener
elem.removeEventListener('build', listener, false);

Many people, however, are familiar with Push Collections via Observables and RxJS where you can create sequences from events, synchronous sources, Promises and other types.

import { of } from `rxjs`
import { map } from 'rxjs/operators';

const source = of(1, 2, 3, 4).pipe(
  map(item => item * item)
);

const subscription = source.subscribe({
  next: item => console.log(item)
});

These push events may be synchronous or asynchronous depending on the source, which could cause all sorts of issues such as missing data if not immediately subscribed to the source. That and the constructor and destructor of your listener and source is completely synchronous. The other disadvantage is that the consumer has relatively little control over how fast the producer sends you values, so the consumer can get overwhelmed. Problems like this are usually solved with backpressure, but that's rather hard to implement on a purely push sequence. That's not to say there aren't options such as lossy and lossless operations, but all involve some sort of dropping data or buffering data in memory.

Asynchronous Pull/Push and Push/Pull

Just as we have pull and push sequences that are synchronous, we also have a notion of a hybrid approach of either pull/push or push/pull.

Pull/Push Based Collections

Relatively new to the JavaScript world is the concept of an asynchronous sequence, such as the AsyncIterable or AsyncGenerator. This allows us to create sequences where every time we call next on the async iterator gives us a Promise of an iterator result containing whether the sequence is done and the current value. We can get the async iterator for the sequence by calling the [Symbol.asyncIterator] method. This is a concept of a pull/push sequence in that you pull the value by calling next(), and then the Promise pushes the value to you when it is ready. Since the iterator next method returns a Promise, we can easily await the iterator result, such as in a while loop.

const sourceFunction = async function* () {
  yield Promise.resolve(1);
  yield Promise.resolve(2);
  yield Promise.resolve(3);
};

const source = sourceFunction();
const it = source[Symbol.asyncIterator]();

let next;
while (!(next = await it.next()).done) {
  console.log(next.value);
}

Of course there's also a nice shorthand for iterating over an async iterable using the for await ... of construct.

const sourceFunction = async function* () {
  yield Promise.resolve(1);
  yield Promise.resolve(2);
  yield Promise.resolve(3);
};

const source = sourceFunction();

for await (const item of source) {
  console.log(item);
}

What's nice about async iterables is that they put the consumer in charge of how fast to consume the data from the producer, which is important in the cases of I/O and network bound calls. For example, you wouldn't want to read a 2TB file using an Observable where the producer will keep pushing bytes as fast as it can read them, so having the consumer in charge is key. In fact, Node.js has now implemented Node Streams as AsyncIterables making them much easier to consume.

import * as fs from 'fs';

const readStream = fs.createReadStream('build.log', { encoding: 'utf8', highWaterMark: 1024 });

for await (const chunk of readStream) {
  console.log(`chunk: ${chunk}`);
}

Just as we have a wrapper around the Iterable with IxJS, we also have a wrapper around the AsyncIterable via the AsyncIterableX or async iterable extensions. This allows us to also have asynchronous projects such as accepting async callbacks that return Promises for operators such as map.

import { of } from 'ix/asynciterable';
import { map } from 'ix/asynciterable/operators';

const source = of(1, 2, 3, 4).pipe(
  map(itm => item * item)
);

for await (const item of source) {
  console.log(item);
}

We can model things such as events using async iterables, however that usually involves buffering at some point to keep track of all emitted items as eventual values. So, maybe that's not ideal.

Push/Pull Based Collections

We've explored so far the notion of pure pull versus push, and asynchronous pull/push, but what about if we took the Observable and made it deterministically asynchronous? That means the construction of the observable would be asynchronous, as well as any teardown logic as well. In addition, all the projections such as calls to next, error and complete on the subscriber would also be asynchronous. This has a number of advantages in regards to issues that are problematic with pure push scenarios such as backpressure, but also the asynchronous nature of connecting and disconnecting from sources such as network I/O, etc.

In IxJS, we are creating the AsyncObservable for this very purpose and it should be ready in the next release. Here, we can create a sequence with asynchronous construction, emitting values, and asynchronous teardown.

import { AsyncObservable } from 'ix/asyncobservable';

const source = new AsyncObservable(async observer => {

  const onNext = async function(value) {
    await observer.next(value);
  };
  
  const onError = async function(err) {
    await observer.error(err);
  };
  
  // Connect to some async source with some async callbacks
  const asyncSource = openSource();
  const subscription = await asyncSource.connect(onNext, onError);

  // Return a function that disconnects asynchronously
  return async function() {
    // Disconnect from some async source
    await subscription.disconnect();
  };
});

// Subscribe to source asynchronously
const subscription = await source.subscribeAsync({
  next: async (item) => console.log(`Next: ${item}`),
  error: async (err) => console.log(`Error: ${err}`)
  complete: async () => console.log(`Complete!`)
);

// Unsubscribe asynchronously
await subscription.unsubscribeAsync();

Then of course we could implement projections that are async for operators such as map, filter, scan and more and use them such as this:

import { of } from 'ix/asyncobservable';
import { map } from 'ix/asyncobservable/operators';

const source = of(1, 2, 3, 4).pipe(
  map(async (item) => item * item)
);

const subscription = await source.subscribeAsync({
  next: async (item) => console.log(`Next: ${item}`),
  error: async (err) => console.log(`Error: ${err}`)
  complete: async () => console.log(`Complete!`)
);

As I've said, the advantages here are many when we're dealing with remote sources such as network I/O where we can gracefully open and terminate connections. We have an existing implementation of AsyncObservable in the Rx .NET AsyncRx repository to see a full implementation.

Recap

So, let's recap, we have four types of collections, each with their own purpose. Each has its own place, and there's no one solution that rules them all.

  • Pull: Iterable - purely synchronous data, either finite or infinite
  • Push: Observable / Subject/Observer - eventual data such as DOM events, collections over time
  • Pull/Push: AsyncIterable - I/O or other asynchronous data where the consumer needs to be in control
  • Push/Pull: AsyncObservable - Network calls where creation/teardown may be asynchronous as well as projections may be asynchronous too.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.