The docs for .share()
state: 'Returns a new Observable that multicasts (shares) the original Observable.'.
This sounds like share()
is a lovely simple method that'll make your life easier. However share()
is really a trap, for if you have the hubris to use .share()
the angry complexity bees that live inside RxJS will swarm out and sting you in the eye (in a rate-limited fashion).
Let's see why. Our goal is to take a cold observable, take only the first item from it, do something to it, and share the resulting observable between 2 subscribers. They should both share the values from our new observable. Easy huh?
{
const coldObservable = Rx.Observable.create(function(observer) {
console.log('observable created')
observer.next(10)
});
const shared = coldObservable
.take(1)
.map(x => x + 1)
.share();
shared.subscribe((x) => console.log('eg1: sub 1', x))
shared.subscribe((x) => console.log('eg1: sub 2', x))
}
Fantastic! share()
seems a lovely solution. Let's try it:
// paste into console with Rx in scope, results in:
observable created
eg1: sub 1 11
observable created
eg1: sub 2 11
Um? So we have 2 observables. That doesn't seem like sharing...
Why does this happen? Weeeeeeellll.... our shared observable .complete()
s after one item is emitted due to take(1)
. Importantly,
it's also synchronous.
This means after the first subscribe the observable has completed, synchronously. The second subscription call causes the hot observable to be recreated, which resubscribes to the cold-observable factory, and we see the alarming effect we observed above.
Let's read a version of the example with annotations to reveal the runtime behaviour:
{
let observablesCreated = 0;
const coldObservable = Rx.Observable.create(function(observer) {
observablesCreated += 1;
observer.next(10)
})
const shared = coldObservable
.take(1)
.map((x) => x + 1)
.share()
shared.subscribe((x) => console.log('eg1: sub 1', x))
// RUNTIME ANNOTATION:
// shared - subscribed() - ok, let's ask coldObservable for a new instance
// coldObservable - *created*
// coldObservable - next(1)
// shared - next(11)
// shared - complete()
shared.subscribe((x) => console.log('eg1: sub 2', x))
// RUNTIME ANNOTATION:
// shared - subscribed() - ah, my previous instance is complete. ok, let's ask coldObservable
// > for a new instance
// coldObservable - *created*
// coldObservable - next(1)
// shared - next(11)
// shared - complete()
// and thus this assertion fails
console.assert(observablesCreated === 1, `eg1: What the heck?! Though shared, ${observablesCreated} observables were created`);
''
}
If we remove .take(1), everything is 'fine', let's see why:
{
console.log('')
console.log('start of eg2')
let observablesCreated = 0;
// here's our Observable, waiting for subscriptions
const coldObservable = Rx.Observable.create(function(observer) {
console.log('eg2: created observable')
observablesCreated += 1;
observer.next(10)
})
// .share - "Returns a new Observable that multicasts (shares) the original Observable."
const shared = coldObservable
.map((x) => (console.log('eg1: mapped'), x + 1))
.share()
// now, since our observable doesn't complete these are both subscribing to the same observable instance
shared.subscribe((x) => console.log('eg2: sub 1', x))
shared.subscribe((x) => console.log('eg2: sub 2', x))
// so, we get a happy ending!
console.assert(observablesCreated === 1, `eg2: What the heck?! Though shared, ${observablesCreated} observables were created`);
''
}
Bah!