Last active
June 18, 2019 09:53
-
-
Save wernerdegroot/9b795964e944bd47782dac3c2aa0865d to your computer and use it in GitHub Desktop.
New RxJs operator `exhaustMapWithLatest`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { TestScheduler } from 'rxjs/testing' | |
import { exhaustMapWithLatest } from './exhaustMapWithLatest' | |
import { of } from 'rxjs' | |
describe('exhaustMapWithLatest', () => { | |
it('should act as `map` when the inner `Observable<..>` completes directly', () => { | |
const scheduler = new TestScheduler((actual, expected) => { | |
expect(actual).toEqual(expected) | |
}) | |
const sourceMarble = '--a--b--c|' | |
const expectedMarble = '--A--B--C|' | |
const sourceObservable = scheduler.createColdObservable(sourceMarble, { a: 1, b: 2, c: 3 }) | |
const expectedValues = { | |
A: 10, | |
B: 20, | |
C: 30 | |
} | |
const resultObservable = sourceObservable.pipe(exhaustMapWithLatest(x => of(10 * x))) | |
scheduler.expectObservable(resultObservable).toBe(expectedMarble, expectedValues) | |
scheduler.flush() | |
}) | |
it('should behave like `concatMap` when there is no more than one new outer during subscription to inner', () => { | |
const scheduler = new TestScheduler((actual, expected) => { | |
expect(actual).toEqual(expected) | |
}) | |
const sourceMarble = '--a--b--c|' | |
const expectedMarble = '------A----B----C|' | |
const sourceObservable = scheduler.createColdObservable(sourceMarble, { a: 1, b: 2, c: 3 }) | |
const expectedValues = { | |
A: 10, | |
B: 20, | |
C: 30 | |
} | |
const resultObservable = sourceObservable.pipe(exhaustMapWithLatest(x => scheduler.createColdObservable('----X|', { X: x * 10 }))) | |
scheduler.expectObservable(resultObservable).toBe(expectedMarble, expectedValues) | |
scheduler.flush() | |
}) | |
it('should behave a bit like `exhaustMap` when there is are more than one new outers during subscription to inner (but also apply the operator the last outer)', () => { | |
const scheduler = new TestScheduler((actual, expected) => { | |
expect(actual).toEqual(expected) | |
}) | |
const sourceMarble = '--a--b--c|' | |
const expectedMarble = '---------A-------C|' | |
const sourceObservable = scheduler.createColdObservable(sourceMarble, { a: 1, b: 2, c: 3 }) | |
const expectedValues = { | |
A: 10, | |
B: 20, | |
C: 30 | |
} | |
const resultObservable = sourceObservable.pipe(exhaustMapWithLatest(x => scheduler.createColdObservable('-------X|', { X: x * 10 }))) | |
scheduler.expectObservable(resultObservable).toBe(expectedMarble, expectedValues) | |
scheduler.flush() | |
}) | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, OperatorFunction, defer, empty, concat } from 'rxjs' | |
import { mergeMap } from 'rxjs/operators' | |
// Behaves a bit like `exhaustMap`. | |
// Instead of processing one `Outer` and ignoring every | |
// other `Outer` that is received while processing the fist, | |
// this operator collects the last `Outer` received and also | |
// lets you map that to `Observable<Inner>`: | |
// | |
// input: --a--b--c--d | |
// fn: x => --------X| | |
// expected: ----------A--------C--------D | |
export const exhaustMapWithLatest = <Outer, Inner>(fn: (outer: Outer) => Observable<Inner>): OperatorFunction<Outer, Inner> => (source: Observable<Outer>): Observable<Inner> => { | |
return defer(() => { | |
let subscribedToInner: boolean = false | |
let last: { value: Outer } | null = null | |
function nextOuter(outer: Outer): Observable<Inner> { | |
if (subscribedToInner) { | |
last = { value: outer } | |
return empty() | |
} else { | |
subscribedToInner = true | |
last = null | |
return concat( | |
fn(outer), | |
defer(() => { | |
subscribedToInner = false | |
return last === null ? empty() : nextOuter(last.value) | |
}) | |
) | |
} | |
} | |
return source.pipe(mergeMap(nextOuter)) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment