Skip to content

Instantly share code, notes, and snippets.

@wernerdegroot
Last active June 18, 2019 09:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wernerdegroot/9b795964e944bd47782dac3c2aa0865d to your computer and use it in GitHub Desktop.
Save wernerdegroot/9b795964e944bd47782dac3c2aa0865d to your computer and use it in GitHub Desktop.
New RxJs operator `exhaustMapWithLatest`
import { TestScheduler } from 'rxjs/testing'
import { exhaustMapWithLatest } from './exhaustMapWithLatest'
import { of } from 'rxjs'
describe('exhaustMapWithLatest', () => {
it('should act as `map` when the inner `Observable<..>` completes directly', () => {
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
const sourceMarble = '--a--b--c|'
const expectedMarble = '--A--B--C|'
const sourceObservable = scheduler.createColdObservable(sourceMarble, { a: 1, b: 2, c: 3 })
const expectedValues = {
A: 10,
B: 20,
C: 30
}
const resultObservable = sourceObservable.pipe(exhaustMapWithLatest(x => of(10 * x)))
scheduler.expectObservable(resultObservable).toBe(expectedMarble, expectedValues)
scheduler.flush()
})
it('should behave like `concatMap` when there is no more than one new outer during subscription to inner', () => {
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
const sourceMarble = '--a--b--c|'
const expectedMarble = '------A----B----C|'
const sourceObservable = scheduler.createColdObservable(sourceMarble, { a: 1, b: 2, c: 3 })
const expectedValues = {
A: 10,
B: 20,
C: 30
}
const resultObservable = sourceObservable.pipe(exhaustMapWithLatest(x => scheduler.createColdObservable('----X|', { X: x * 10 })))
scheduler.expectObservable(resultObservable).toBe(expectedMarble, expectedValues)
scheduler.flush()
})
it('should behave a bit like `exhaustMap` when there is are more than one new outers during subscription to inner (but also apply the operator the last outer)', () => {
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
const sourceMarble = '--a--b--c|'
const expectedMarble = '---------A-------C|'
const sourceObservable = scheduler.createColdObservable(sourceMarble, { a: 1, b: 2, c: 3 })
const expectedValues = {
A: 10,
B: 20,
C: 30
}
const resultObservable = sourceObservable.pipe(exhaustMapWithLatest(x => scheduler.createColdObservable('-------X|', { X: x * 10 })))
scheduler.expectObservable(resultObservable).toBe(expectedMarble, expectedValues)
scheduler.flush()
})
})
import { Observable, OperatorFunction, defer, empty, concat } from 'rxjs'
import { mergeMap } from 'rxjs/operators'
// Behaves a bit like `exhaustMap`.
// Instead of processing one `Outer` and ignoring every
// other `Outer` that is received while processing the fist,
// this operator collects the last `Outer` received and also
// lets you map that to `Observable<Inner>`:
//
// input: --a--b--c--d
// fn: x => --------X|
// expected: ----------A--------C--------D
export const exhaustMapWithLatest = <Outer, Inner>(fn: (outer: Outer) => Observable<Inner>): OperatorFunction<Outer, Inner> => (source: Observable<Outer>): Observable<Inner> => {
return defer(() => {
let subscribedToInner: boolean = false
let last: { value: Outer } | null = null
function nextOuter(outer: Outer): Observable<Inner> {
if (subscribedToInner) {
last = { value: outer }
return empty()
} else {
subscribedToInner = true
last = null
return concat(
fn(outer),
defer(() => {
subscribedToInner = false
return last === null ? empty() : nextOuter(last.value)
})
)
}
}
return source.pipe(mergeMap(nextOuter))
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment