Skip to content

Instantly share code, notes, and snippets.

@whiteinge
Created March 21, 2017 17:07
Show Gist options
  • Save whiteinge/9dab34105233871f3d660d9b1056a7ad to your computer and use it in GitHub Desktop.
Save whiteinge/9dab34105233871f3d660d9b1056a7ad to your computer and use it in GitHub Desktop.
waitLatestFrom RxJS operator as a middle-ground between withLatestFrom and combineLatest
/**
waitLatestFrom(...sources, [selector])
Like withLatestFrom but waits for all sources to produce at least one value
before emitting. Emits immediately once it has a value for all sources. After
that it actively collects the latest emission from the other source observables
in the background and emits the latest collected values whenever the primary
source emits -- like withLatestFrom.
If the primary source completes, the other sources will be completed as well.
If the primary source completes before the other sources produce, it will
continue to wait for one value from each source, then emit, then complete.
Usage:
Rx.Observable.interval(1000).map('first').take(1)
.waitLatestFrom(
Rx.Observable.timer(2000).map('second').take(1),
Rx.Observable.timer(3000).map('third').take(1))
.subscribe(x => console.log('x', x));
**/
export function waitLatestFrom(...sources) {
var that = this;
var selector = _.isFunction(_.last(sources)) ? sources.pop() : null;
return Rx.Observable.create(function(observer) {
// Set a flag so we can avoid checking latestVals for every emission.
var doneInitialEmit = false;
// Set a flag if onCompleted is called before we make the initial emit.
var shouldComplete = false;
// Keep all subscriptions in one place for easy bulk disposal.
var allSubs = new Rx.CompositeDisposable();
// Keep track of the latest value from each source observable.
var initialPrimaryVal;
var latestVals = [];
_.forEach(sources, function(source, idx) {
latestVals.push(undefined);
var sub = source
.subscribe(val => {
latestVals[idx] = val;
// Redundant conditional but it saves a seldomly-called
// function in a potentially very busy pipeline.
if (!doneInitialEmit) { initialEmit(); }
});
allSubs.add(sub);
});
allSubs.add(that.subscribe({onNext, onError, onCompleted}));
return allSubs;
// --------------------------------------------------------------------
/**
Once we've emitted once only emit when the primary source emits
**/
function onNext(val) {
if (!doneInitialEmit) {
initialPrimaryVal = val;
initialEmit();
} else {
emit(val);
}
}
function onError(err) { observer.onError(err); }
function onCompleted() {
if (!doneInitialEmit) {
shouldComplete = true;
} else {
observer.onCompleted();
}
}
/**
Call onNext optionally using the selector function
**/
function emit(val) {
if (selector) {
observer.onNext(selector(val, ...latestVals));
} else {
observer.onNext([val, ...latestVals]);
}
}
/**
Emit as soon as we have one value from all sources, including the
primary source. From then on short-circuit; the regular onNext behavior
will trigger instead.
**/
function initialEmit() {
if (!doneInitialEmit &&
initialPrimaryVal !== undefined &&
_.every(latestVals, _.negate(_.isUndefined))) {
doneInitialEmit = true;
emit(initialPrimaryVal);
// No need to track this after initial emit.
initialPrimaryVal = undefined;
// If onCompleted was called while waiting, complete now.
if (shouldComplete) { observer.onCompleted(); }
}
}
});
}
import {describe, it} from 'mocha';
import * as collectionAssert from 'rx-collectionassert';
import * as Rx from 'rx';
import {ReactiveTest} from 'rx';
import {waitLatestFrom} from '../../js/utils/rx-ext';
describe('waitLatestFrom', function() {
Rx.Observable.prototype.waitLatestFrom = waitLatestFrom;
it('Should wait for sources', function() {
var scheduler = new Rx.TestScheduler();
var s1 = scheduler.createHotObservable(
ReactiveTest.onNext(250, 'first'),
);
var s2 = scheduler.createHotObservable(
ReactiveTest.onNext(260, 'second'),
);
var s3 = scheduler.createHotObservable(
ReactiveTest.onNext(270, 'third'),
);
var results = scheduler.startScheduler(function() {
return s1.waitLatestFrom(s2, s3);
}, {created: 100, subscribed: 200, disposed: 500});
collectionAssert.assertEqual(results.messages, [
ReactiveTest.onNext(270, ['first', 'second', 'third']),
]);
});
it('Should wait for primary', function() {
var scheduler = new Rx.TestScheduler();
var s1 = scheduler.createHotObservable(
ReactiveTest.onNext(270, 'third'),
);
var s2 = scheduler.createHotObservable(
ReactiveTest.onNext(260, 'second'),
);
var s3 = scheduler.createHotObservable(
ReactiveTest.onNext(250, 'first'),
);
var results = scheduler.startScheduler(function() {
return s1.waitLatestFrom(s2, s3);
}, {created: 100, subscribed: 200, disposed: 500});
collectionAssert.assertEqual(results.messages, [
ReactiveTest.onNext(270, ['third', 'second', 'first']),
]);
});
it('Should emit only when primary emits', function() {
var scheduler = new Rx.TestScheduler();
var s1 = scheduler.createHotObservable(
ReactiveTest.onNext(250, 'first'),
ReactiveTest.onNext(290, 'fifth'),
);
var s2 = scheduler.createHotObservable(
ReactiveTest.onNext(260, 'second'),
ReactiveTest.onNext(280, 'fourth'),
);
var s3 = scheduler.createHotObservable(
ReactiveTest.onNext(270, 'third'),
);
var results = scheduler.startScheduler(function() {
return s1.waitLatestFrom(s2, s3);
}, {created: 100, subscribed: 200, disposed: 500});
collectionAssert.assertEqual(results.messages, [
ReactiveTest.onNext(270, ['first', 'second', 'third']),
ReactiveTest.onNext(290, ['fifth', 'fourth', 'third']),
]);
});
it('Should discard multiple primary emissions while waiting', function() {
var scheduler = new Rx.TestScheduler();
var s1 = scheduler.createHotObservable(
ReactiveTest.onNext(251, 'first-1'),
ReactiveTest.onNext(252, 'first-2'),
ReactiveTest.onNext(253, 'first-3'),
);
var s2 = scheduler.createHotObservable(
ReactiveTest.onNext(260, 'second'),
);
var s3 = scheduler.createHotObservable(
ReactiveTest.onNext(270, 'third'),
);
var results = scheduler.startScheduler(function() {
return s1.waitLatestFrom(s2, s3);
}, {created: 100, subscribed: 200, disposed: 500});
collectionAssert.assertEqual(results.messages, [
ReactiveTest.onNext(270, ['first-3', 'second', 'third']),
]);
});
it('Should work with a selector fn', function() {
var scheduler = new Rx.TestScheduler();
var s1 = scheduler.createHotObservable(
ReactiveTest.onNext(260, 'second'));
var s2 = scheduler.createHotObservable(
ReactiveTest.onNext(250, 'first'));
var results = scheduler.startScheduler(function() {
return s1.waitLatestFrom(s2, function(one, two) {
return {one, two};
});
}, {created: 100, subscribed: 200, disposed: 500});
collectionAssert.assertEqual(results.messages, [
ReactiveTest.onNext(260, {one: 'second', two: 'first'}),
]);
});
it('Should dispose all sources', function() {
var scheduler = new Rx.TestScheduler();
var s1 = scheduler.createHotObservable(
ReactiveTest.onNext(260, 'second'),
ReactiveTest.onCompleted(270));
var s2 = scheduler.createHotObservable(
ReactiveTest.onNext(250, 'first'));
var results = scheduler.startScheduler(function() {
return s1.waitLatestFrom(s2);
}, {created: 100, subscribed: 200, disposed: 500});
collectionAssert.assertEqual(results.messages, [
ReactiveTest.onNext(260, ['second', 'first']),
ReactiveTest.onCompleted(270),
]);
collectionAssert.assertEqual(s2.subscriptions, [
ReactiveTest.subscribe(200, 270),
]);
});
it('Should wait to complete if primary source completes', function() {
var scheduler = new Rx.TestScheduler();
var s1 = scheduler.createHotObservable(
ReactiveTest.onNext(250, 'first'),
ReactiveTest.onCompleted(255),
);
var s2 = scheduler.createHotObservable(
ReactiveTest.onNext(260, 'second'),
);
var results = scheduler.startScheduler(function() {
return s1.waitLatestFrom(s2);
}, {created: 100, subscribed: 200, disposed: 500});
collectionAssert.assertEqual(results.messages, [
ReactiveTest.onNext(260, ['first', 'second']),
ReactiveTest.onCompleted(260),
]);
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment