Skip to content

Instantly share code, notes, and snippets.

@mcattarinussi
Created December 14, 2019 22:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcattarinussi/231d996194da8d417d4e097d608f701a to your computer and use it in GitHub Desktop.
Save mcattarinussi/231d996194da8d417d4e097d608f701a to your computer and use it in GitHub Desktop.
Subscribe to multiple inner observables concurrently and emit results in order
import {
from,
BehaviorSubject,
Observable,
ObservableInput,
ObservedValueOf,
OperatorFunction,
MonoTypeOperatorFunction,
} from 'rxjs';
import { mergeMap, delayWhen, finalize, skipWhile, take } from 'rxjs/operators';
const delayUntilIsActiveIndex = <O extends ObservableInput<any>>(
activeIndex$: Observable<number>,
innerIndex: number
): MonoTypeOperatorFunction<O> =>
delayWhen(() =>
activeIndex$.pipe(
skipWhile(activeIdx => activeIdx !== innerIndex),
take(1)
)
);
const bumpIndexOnComplete = <O extends ObservableInput<any>>(
activeIndex$: BehaviorSubject<number>,
innerIndex: number
): MonoTypeOperatorFunction<O> => finalize(() => activeIndex$.next(innerIndex + 1));
export function orderedMergeMap<T, O extends ObservableInput<any>>(
project: (value: T, index: number) => O,
concurrent = 5
): OperatorFunction<T, ObservedValueOf<O>> {
const activeIndex$ = new BehaviorSubject(0);
return mergeMap(
(value: T, index: number) =>
from(project(value, index)).pipe(
delayUntilIsActiveIndex(activeIndex$, index),
bumpIndexOnComplete(activeIndex$, index)
),
concurrent
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment