Skip to content

Instantly share code, notes, and snippets.

@traviskaufman
Last active November 26, 2019 17:31
Show Gist options
  • Save traviskaufman/aeab070b22da34c10ae5d8d42d8cff6a to your computer and use it in GitHub Desktop.
Save traviskaufman/aeab070b22da34c10ae5d8d42d8cff6a to your computer and use it in GitHub Desktop.
Demystifying RxJS, Part II: MergeMap
function mergeMap<T, R>(
project: (value: T, index: number) => Observable<R>
): OperatorFunction<T, R> {
let currentIndex = 0;
return source =>
new Observable(obs => {
const subscriptions = new Set<Subscription>();
const sub = source.subscribe(
x => {
const projected = project(x, currentIndex++);
const innerSub = projected.subscribe(
px => obs.next(px),
err => obs.error(err),
() => {
subscriptions.delete(innerSub);
}
);
subscriptions.add(innerSub);
},
err => obs.error(err),
() => {
Array.from(subscriptions).forEach(s => s.unsubscribe());
obs.complete();
}
);
subscriptions.add(sub);
return () => Array.from(subscriptions).forEach(s => s.unsubscribe());
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment