Skip to content

Instantly share code, notes, and snippets.

@brentd
Last active November 7, 2017 20:41
Show Gist options
  • Save brentd/189de80b14c5876f2370c3f774a7f3b0 to your computer and use it in GitHub Desktop.
Save brentd/189de80b14c5876f2370c3f774a7f3b0 to your computer and use it in GitHub Desktop.
combineLatestActive: like combineLatest, but for a stream of streams, and only combines non-completed streams
import { Observable } from 'rxjs'
// Takes a higher-order observable as input and subscribes to its inner
// observables. Emits an array containing the last emitted value from each *active*
// inner observable when:
//
// 1. any inner observable emits
// 2. any inner observable completes
//
// In contrast to `combineLatest`, the latest value from completed observables
// are not included in the emitted array.
//
export default function combineLatestActive(source$) {
return new Observable.create(observer => {
const latest = new Map()
const emit = () => observer.next(Array.from(latest.values()))
const emitLatest = inner$ =>
inner$.do({
next: x => {
latest.set(inner$, x)
emit()
},
complete: () => {
latest.delete(inner$)
emit()
}
})
return source$.mergeMap(emitLatest).finally(() => observer.complete()).subscribe()
})
}
import assert from 'assert'
import { marbles } from 'rxjs-marbles'
import combineLatestActive from 'rx/combineLatestActive'
describe('combineLatestActive()', () => {
it('emits the latest value from each active inner observable', marbles(m => {
const o1 = m.cold('a--b--c--|')
const o2 = m.cold( 'x--y--z|')
const obss = m.cold('1---2--------', {'1': o1, '2': o2})
const expected = m.cold('1--23-45-678', {
'1': ['a'],
'2': ['b'],
'3': ['b', 'x'],
'4': ['c', 'x'],
'5': ['c', 'y'],
'6': ['y'],
'7': ['z'],
'8': []
})
m.equal(combineLatestActive(obss), expected)
}))
it('stays subscribed to inner observables until they complete ', marbles(m => {
const o1 = m.cold('----|')
const o1subs = '^---!'
const o2 = m.cold( '----|')
const o2subs = '--^---!'
const source = m.cold('1-2|', {'1': o1, '2': o2})
combineLatestActive(source).subscribe()
m.has(o1, o1subs)
m.has(o2, o2subs)
}))
it('completes when the source AND inner observables have completed', marbles(m => {
const o1 = m.cold('-----|')
const o2 = m.cold( '-------|')
const source = m.cold('1--2----|', {'1': o1, '2': o2})
const expected = m.cold('-----a----(a|)', {a: []})
m.equal(combineLatestActive(source), expected)
}))
it('unsubscribes from completed inner observables', marbles(m => {
const o1 = m.cold('a-b|')
const o1subs = '^--!'
const o2 = m.cold( 'a-b-c-d|')
const o2subs = '--^------!'
const source = m.cold('1-2----------|', {'1': o1, '2': o2})
combineLatestActive(source).subscribe()
m.has(o1, o1subs)
m.has(o2, o2subs)
}))
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment