Skip to content

Instantly share code, notes, and snippets.

@ghetolay
Last active December 1, 2017 23:18
Show Gist options
  • Save ghetolay/c9cd3416d35718045578cae2240d3733 to your computer and use it in GitHub Desktop.
Save ghetolay/c9cd3416d35718045578cae2240d3733 to your computer and use it in GitHub Desktop.
flatMap proposal for v6

From that @dorus's comment and after talking a while with him about it, I think we can merge the implementation of all the flatten map operators: mergeMap, concatMap, switchMap, exhaustMap, debounce/audiMap (see #1777) into a single operator flexible enough to cover all scenarios. Then each of those operators would just be an alias of that main operator. Plus it may open new possibilities like the debounce/auditMap that doesn't exist yet.

I'll use flatMap and Queue because it's hard to find new meaningful names and it's not worth spending too much time on it at this stage.

Signature

flatMap<T>(project: (value: T, index: number) => ObservableInput<R>, queue: Queue): OperatorFunction<T, R>;

The new flatMap operator becomes just some kind of shell doing operator related stuff like managing inner/outer observable but it won't take any decision anymore about what to subscribe to or cancel. Those decisions will be delegated to queue which is a simple interface decorrelated from rxjs internals to make it accessible for users. Queue will be composed of 2 functions, analog to push & pop but specific to our case. Those functions will be hooks called :

  • when the source observable emits
  • when an inner observable complete.

The algorithm of flatMap will then be as follow :

  1. subscribe to source
  2. on source emits, depending on the queue implementation, it'll do zero or more of :
    • run project on an item and subscribe
    • cancel a running subscription
  3. on inner completion, depending on the queue implementation, it'll either :
    • run project on an item and subscribe
    • do nothing
  4. on source completion, inner emits, inner error and outer unsubscribe :
    • current behavior

Queue Interface

export interface Queue<T> {
	/**
	 *
	 * @param item new item emitted by the source
	 * @param actives list of items corresponding to actives subscriptions running
	 * @return a tuple with 2 values :
	 *   1. optional(arguable) item to run project and subscribe to 
	 *   2. optional index of subscription to cancel
	 */
	onNewItem(item: T, actives: T[]): [T | undefined, number | undefined] | undefined;

	/**
	 * 
	 * @param completed item corresponding to the completed observable
	 * @return optional, an item (to run project and subscribe to)
	 */
	onSubComplete(completed: T): T | void;
}

As you can see it's fairly simple (except the tuple maybe), straightforward and easy for users to implements. It's all just about items, and nothing about rx stuff like subscriptions. We can easily build a concurrent queue, buffer queue, priority queue etc...

Variant operators

With those queue implementations :

const NoQueue = {
	onNewItem: item => item,
	onSubComplete: () => { }
}

class ConcurrentFifoQueue<T> implements Queue<T> {
	private buffer: T[] = [];

	constructor(private concurrent = Number.POSITIVE_INFINITY, 
		private bufferSize = Number.POSITIVE_INFINITY, 
		private dropRunning = false) {}

	onNewItem(item: T, actives: T[]): [T | undefined, number | undefined] | void {
		// didn't reach maximum concurrent we can subscribe to item
		if (actives.length < this.concurrent)
			return [item];

		// max concurrent reached, save item on buffer nothing else
		if (this.buffer.length < this.bufferSize) {
			this.buffer.push(item);
			return;
		}

		// buffer overflow, remove latest item and add the new item
		this.buffer.push(item);
		const dropItem = this.buffer.shift();

		// drop latest running subscription and subscribe to latest buffered item
		if (this.dropRunning) {
			return [dropItem, actives.length - 1];
		}

		// drop latest item and do nothing else.
	}

	onSubComplete(): T | void {
		if (this.buffer.length > 0)
			return this.buffer.shift();
	}
}

We can now express all existings flatten operators as an alias :

  • mergeMap: flatMap(project, NoQueue)
  • concatMap: flatMap(project, new ConcurrentFifoQueue(1)) (default buffersize being infinity).
  • exhaustMap: flatMap(project, new ConcurrentFifoQueue(1, 0)) (default drop being drop item).
  • switchMap: flatMap(project, new ConcurrentFifoQueue(1, 0, true))
  • debounce/auditMap: flatMap(project, new ConcurrentFifoQueue(1, 1))

Pro/cons

Pro :

  • Add flexibility and offers new possibilities (debounceMap, priority queue...)
  • reduce lib code size
  • easier to maintain

Con:

  • obviously some perf penality due to the flexibility but should be very minimal (only a bunch of conditonals and array read/write).
  • onNewItem signature is not the sexiest API cause of the tuple, I tried removing it but ended up with even more complicated API.

Thanks

Thanks @Dorus for all your time spend :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment