Skip to content

Instantly share code, notes, and snippets.

@ghetolay
Created January 30, 2018 10:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ghetolay/ddc737d8552aeaa8ef34fc6a3e085c32 to your computer and use it in GitHub Desktop.
Save ghetolay/ddc737d8552aeaa8ef34fc6a3e085c32 to your computer and use it in GitHub Desktop.
Audit Map operator for rxjs
// project signature should be (value: T, index: number) => ObservableInput<I>
// ObservableInput instead of Observable but I don't care about handling promise and array like.
export function auditMap<T, I>(
this: Observable<T>,
project: (value: T, index: number) => Observable<I>): Observable<I>;
export function auditMap<T, I, R>(
this: Observable<T>,
project: (value: T, index: number) => Observable<I>,
resultSelector: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R)): Observable<I | R>
export function auditMap<T, I, R>(
this: Observable<T>,
project: (value: T, index: number) => Observable<I>,
resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R)): Observable<I | R> {
let nextArgs: [T, number] | undefined;
let isRunning = false;
const subscriberFact = (ob: Observer<I>): Observer<I> => ({
next: (v: I) => { ob.next(v); },
error: (e: any) => { ob.error(e); },
complete: () => {
if (nextArgs !== undefined) {
const [nextE, nextI] = nextArgs;
nextArgs = undefined;
project(nextE, nextI).subscribe(subscriberFact(ob));
}
else {
isRunning = false;
ob.complete();
}
}
});
return mergeMap.call(this,
(e: T, i: number) => {
if (!isRunning) {
isRunning = true;
return Observable.create( (observer: Observer<I>) => { project(e, i).subscribe(subscriberFact(observer)); });
}
else {
nextArgs = [e, i];
return empty();
}
},
resultSelector
);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment