Skip to content

Instantly share code, notes, and snippets.

@ntilwalli
Last active January 24, 2016 18:13
Show Gist options
  • Save ntilwalli/ae69ad3e7c1009242d82 to your computer and use it in GitHub Desktop.
Save ntilwalli/ae69ad3e7c1009242d82 to your computer and use it in GitHub Desktop.
Going from Cold to Hot

###A Walkthrough of Observables, Subjects, Replay and Sharing Streams in RxJS

To properly understand observables in RxJS you need to understand cold vs. hot, subscription propagation and how subscription propagation is modified by Subjects.

####Cold vs. Hot

Traditionally "observables" are discussed as being either "cold" or "hot". But a discussion of hot and cold becomes more clear when you separate the term "observable" into two distinct concepts: streams and sources.

If you've ever seen a stream of water then you've experienced the ultimate in hot observables (their actually cold temperatures not withstanding.) What makes them hot observables?

  • They flow whether you're observing them or not.
  • They don't generally repeat.

Heraclitus, an ancient Greek philosopher, became kind of famous for pointing the second property out when he said, a person can never step into the same stream twice. He didn't realize it at the time, but he was basically making an observation about hot observables.

Hot observables are, in general, streams that are connected to non-repeatable sources that output independent of anyone observing them. Streams of water are usually sourced from snow melt or aquifer springs and the only way to repeat the process of a specific molecule of snow melting or springing at a specific moment in a specific way would be to travel-back in time. On top of that snow melts, rivers flow, the world keeps going, whether I want it to or not.

Cold observables are exactly the opposite, they are streams that are connected to repeatable source and those sources restart for each subscriber. Cold observables are simple and predictable, which is why they're great for testing and learning about stream operators, and it's why the first observables with which users develop familiarity are usually cold ones. That being said, predictable streams are not interesting streams and interesting applications can only be based on interesting streams. In practice you'll spend most of your time working with hot observables.

But again, cold observables are useful for writing tests and learning. Here are a few examples of streams generated from repeatable sources:

"use strict"
const Rx = require('rx')
const test = require('tape-catch')

// Interval every 100ms (internal dependency on system timers/clock)
const len = 4
const interval100$ = Rx.Observable.interval(100).take(len)

function runIntervalTest() {
  test('interval', assert => {
    assert.plan(1)
    let out = []
    interval100$
      .subscribe(x => {
        out.push(x)

        // This sequence is generated asynchronously because
        // interval uses setTimout which operates on next
        // tick (event loop turn...) so test is done
        //inline (also asynchronously).
        if(out.length === len) {
          assert.deepEqual(out, [0, 1, 2, 3], 'should have 0-3 sequence')
        }
      })
  })
}

runIntervalTest()

// Fibonacci sequence
const makeFib$ = numOutputs => Rx.Observable.create((observer) => {
  let vals = [0, 1]
  for (let x=0; x<numOutputs; x++) {
    if (x<2) {
      observer.onNext(vals[x])
    } else {
      observer.onNext((vals[x%2] = vals.reduce((prev, curr) => prev + curr, 0)))
    }
  }
  observer.onCompleted()
})

const fib$ = makeFib$(4)

function runFibTest() {
  test('Fibonacci$', assert => {
    assert.plan(1)
    let out = []
    fib$.subscribe(x => {
        out.push(x)
      })

    // Above sequence is created/completed synchronously
    // due to the default RxJS scheduler in use so test
    // can happen synchronously as well
    assert.deepEqual(out, [0, 1, 1, 2], 'should have fibonacci values')
  })
}

runFibTest()

As was described, cold observables are repeatable observables. We can easily demonstrate that. If we subscribe a second time to the same observables from above, they'll output the same values.

runIntervalTest()
runFibTest()

Cold observables can also help us understand how "subscribe" calls propagate up stream operator pipelines.

const inc5Just$ = Rx.Observable.just(0)
  .take(4)
  .map(x => x + 5)

In the above stream pipeline the source is cold (just) and it's transformed by two operators. Each transform will in turn create a new observer each of which will generate a new 'subscribe' call to it's source. This means calling subscribe on the returned stream should generate three internal subscribe calls in total. We can see this by modifying the "subscribe" method on Rx.Observable.prototype

function createSubscribeProxies () {
  let subscriptions = 0;
  const oProto = Rx.Observable.prototype
  const oldSubscribe = oProto.subscribe
  const newSubscribe = function () {
    subscriptions++
    return oldSubscribe.apply(this, arguments)
  }

  function setSubscribeProxy () {
    subscriptions = 0
    oProto.subscribe = newSubscribe
  }

  function removeSubscribeProxy () {
    oProto.subscribe = oldSubscribe
  }

  function getSubscriptionsCount () {
    return subscriptions
  }

  return {
    setSubscribeProxy,
    removeSubscribeProxy,
    getSubscriptionsCount
  }
}

const subscribeProxies = createSubscribeProxies()
const setSubscribeProxy = subscribeProxies.setSubscribeProxy
const removeSubscribeProxy = subscribeProxies.removeSubscribeProxy
const getSubscriptionsCount = subscribeProxies.getSubscriptionsCount

function runCountSubscriptionsTest () {
  test('Internal subscribe calls', assert => {
    setSubscribeProxy()
    inc5Just$.subscribe()
    assert.plan(1)
    assert.equal(getSubscriptionsCount(), 3, 'should have 3 internal subscribe propagations')
    removeSubscribeProxy()
  })
}

runCountSubscriptionsTest()

Subscribe call propagation is an important idea in the context of understanding observables in RxJS.

One way to think of a call to subscribe is the opening of a valve. Water can only flow through a pipe if a valve that controls it is open. Calling an RxJS operator on an existing pipeline is like adding a new pipe that has a built-in outflow valve which is opened when a subscribe call is seen. Since the convention of RxJS operators is to open their outflow valve on seeing a subscribe call AND to propagate the subscribe call upwards, the opening of a valve (call to subscribe) at the bottom of a pipeline can cause a cascade of valve openings higher up the chain.

####Subjects

The "valves" in the above analogy are implicit. Subscribe calls open the implicit valves and calling "dispose" on the resulting Subscription objects, closes the valves. However there are many instances in the construction of stream pipelines when you want more control than this, when you want access to a valve for a valve's sake.

Valves are a very useful in and of themselves in the construction of stream pipelines because they offer us the ability to independently control where a stream comes from, where a stream goes and when to enable flow between the two sides. RxJS has the equivalent of a valve in its library called a Subject. Subjects have the properties of both Observer and Observable.

I should add, by default, Subjects do not offer the third option as described above which is independently enabling control of when the flow starts/stops between the two sides, but Subjects are the key ingredient of "connectable" observables -- discussed later -- which do enable this kind of control.

Let's see a Subject in action...

const fromArray$ = Rx.Observable.from([3, 4, 5, 6, 7]).take(4)

function runSubjectAsValveTest () {
  test('Subject is like a valve', assert => {
    setSubscribeProxy()

    // First let's create a Subject. Just as a valve acts as a proxy for a pipe,
    // a Subject acts as proxy for a input/outut, source/sink or in RxJS
    // terminology, observable/observer.

    const subject = new Rx.Subject()

    // Now let's subscribe to the subject as if it's a regular observable

    const out = []
    const subscription = subject.subscribe(
      x => {
        console.log(x)
        out.push(x)
        if (out.length === 4) {
          assert.deepEqual(out, [3, 4, 5, 6], 'source/sink connection should output 0-3 sequence')
        }
      },
      undefined,
      () => {
        assert.pass('should receive completion message')
        assert.end()
      }
  )

    assert.equal(getSubscriptionsCount(), 1, 'should have one subscribe call after sink is connected')

    // Now let's treat the same subject as a sink (observer) and use it to
    // subscribe to an actual source of data. Let's use the fromArray$
    // from above as our source.

    fromArray$.subscribe(subject)
    assert.equal(getSubscriptionsCount(), 3, 'should have three subscribe calls after source is connected')

    // The moment "subscribe" is called on from Array$, subscribe calls are
    // propagated up to the cold source (first on the 'take', then on the
    // 'from') which begins emitting values which flow down to the subject,
    // which, acting as a valve, passes them through to the sink which reacts
    // accordingly.
    //
    // It is important note that the order of connecting source and sink to a
    // simple Subject is relevant and is discussed below.
  })
}

runSubjectAsValveTest()

One interesting thing to note about the test above is how Subjects in a sense, act as a terminus for subscribe call propagation. The subscribe call from the sink does not get passed up the source chain, it stops at the Subject. Why? Since subjects need to be explicitly subscribed to sources, when the sink subscribes to the subject, the source chain has already been subscribed to, so the Subject has no need to propagate subscribe-calls further up the chain. It should be noted, this kind of behavior does NOT apply to subscription disposals (the first subscription disposal from a sink is, by default, is passed up the source chain, which is addressed further down).

Also note that just like the 'onNext' messages pass through subjects, so do completion messages (the same applies to error messages...)

Another interesting thing about the above example is that, though Subjects allow you to separate the process of connecting of sources to sinks, by themselves, they still require that either sinks are connected to the Subject prior to the source being connected. This is only required if the source emits immediately on receiving a subscribe call AND you don't want to deal with the possibility of dropping values on the floor. This concern applies to hot sources, but is especially apparent with cold sources. Let's think about why...

On any given turn of the event loop, if a subscribe call is initiated on a stream which originates at a cold sources, data will be emitted from that source immediately if values are available. In the same turn, those values will reach the Subject and without a sink attached those values will not have anywhere to go and the Subject will simply drop the values.

A plain Subject on it's own doesn't add any behavior like caching or buffering values. If a subject has no sinks available to send values, it outlets them to the floor.

Let's demonstrate this by running the same test as above but with the sink and source connected in the alternate order.

Here's a demonstration...

function runSubjectDroppingValuesTest () {
  test('Subject is like a valve', assert => {
    setSubscribeProxy()

    // First let's create a Subject, just like the last example.

    const subject = new Rx.Subject()

    // Now let's set up the source (before the sink)
    const out = []
    fromArray$.do(
      x => {
        out.push(x)
        if (out.length === 4) {
          assert.deepEqual(out, [3, 4, 5, 6], 'should see 3-6 sequence from source')
        }
      },
      undefined,
      () => assert.pass('this complete should be called')
    ).subscribe(subject)

    // Now let's set up the sink after the source
    const subscription = subject.subscribe(
      x => assert.fail('this next should never be called'),
      undefined,
      () => {
        assert.pass('this complete be called')
        assert.end()
      }
    )
  })
}

runSubjectDroppingValuesTest()

Note that the onNext messages are dropped but the onComplete message does make it through. This might be surprising. RxJS, by default does as much of it's message emissions synchronously, which means on seeing the initial subscribe call, the cold source calls onNext four times and then immediately calls onCompleted. If the sink is not connected to the subject (as evidenced by the dropped onNext messages) how did the onCompleted message make it through? It turns out the Subject has some state. Part of that state keeps track of whether it has seen an onError or onCompleted message, in which case any subscribe calls it receives from a downstream observer will cause an onError/onCompleted message to be sent back immediately. In this case the sink does not receive an onCompleted message from the source, it actually receives it from the Subject directly (since the Subject has previously received an onCompleted message from the source.)

Note that this discarding of values is a problem only if the source emits before the sink is attached to the subject. But a question arises. Is there a way to avoid this kind of ordering sensitivity? The answer is yes.

####Replay Subjects

Let's repeat the question we're trying to answer, "How can we ensure that sinks, which subscribe to a subject after a source begins emitting do not miss getting values coming from the source?" One clear way would be to cache values that come into the subject, and upon seeing any sink subscription, emit the cached values up-front before passing-on any further emissions from the source to the sink. This is exactly what a ReplaySubject does. On construction it must be given the size of the cache, and thereon any subscriptions will first see emissions from the cache (that it has missed) and then emissions from the source pass through as normal

Let's see this at work.

function runReplaySubjectNotDroppingValuesTest () {
  test('Subject is like a valve', assert => {
    setSubscribeProxy()

    // First let's create a ReplaySubject, just like the last example with
    // a size of 3.  We'll send a stream of four values so one of these should
    // be dropped, which will be discussed below.

    const subject = new Rx.ReplaySubject(3)

    // Now show that all four values are emitted from the source
    const sourceOut = []
    fromArray$.do(x => {
      sourceOut.push(x)
      if (sourceOut.length === 4) {
        assert.deepEqual(sourceOut, [3, 4, 5, 6], 'should see 3-6 sequence from source')
      }
    }).subscribe(subject)

    // Now show that only three get seen by the sink...
    const sinkOut = []
    const subscription = subject.subscribe(
      x => {
        sinkOut.push(x)
        if (sinkOut.length === 3) {
          assert.deepEqual(sinkOut, [4, 5, 6], 'should see 4-6 sequence from sink')
        }
      },
      undefined,
      () => {
        assert.pass('should see completed message')
        assert.end()
      }
  )

  })
}

runReplaySubjectNotDroppingValuesTest()

As you can see the limitation of this approach has to do with cache size. In the above example, the ReplaySubject's cache size is smaller than the number of values that are emitted from the source prior to the sink's subscription, thus one of the values gets dropped. But it's clear that if you know beforehand what a good size of the buffer should be, ReplaySubject solves the ordering sensitivity issue quite well.

Just as a side-note, there are a few different types of Subjects beyond just the plain Subject and ReplaySubject. Extending Subject can be useful when you want to decouple source from sink and introduce some behavior which should take place during the pass-through....

####Subjects and Multiple Observers

One of the nice things about Subjects is they allow multiple observers to be subscribed the same source stream, that is internally Subjects keep an array of observers and values from the source are broadcast to all subscribers. This is convenient, but this kind of behavior can introduce some surprising behavior. What happens if one of the downstream subscribers completes before the others do? That is, one of the subscribers has behavior which causes a disposal message to be emitted upstream before the source completes, does the source stream recognize that it still has valid subscribers or does it shut itself down because one of the subscribers decided to opt out of messages? The answer, by default, is the latter. The first subscription disposal propagates through the subject to the source which shuts down the stream. Simple subjects are naive that way. Observers of a subject can thus be affected by the actions of a sibling.

Here's an example:

function runSubjectSiblingUnsubscriptionLeakageTest () {
  test('Subject unsubscription impacts siblings...', assert => {
    setSubscribeProxy()

    const subject = new Rx.Subject()

    const out1 = []

    // Only take 2 of the 4 values before completing...
    const subscription1 = subject.take(2).subscribe(
      x => {
        out1.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(out1, [3, 4], 'one subscriber should only take 2 values from stream')
      }
    )

    const out2 = []

    // Take all the values coming down the pipe... but this doesn't actually
    // happen...
    const subscription2 = subject.subscribe(
      x => {
        out2.push(x)
      },
      undefined,
      () => {
        assert.notDeepEqual(out1, [3, 4, 5, 6], 'expect other subscriber to take all values, but only takes 2 from stream?')
        assert.end()
      }
    )

    // Demonstrate that all four values do get emitted...
    const outSource = []
    fromArray$.do(
      x => {
        outSource.push(x)
        if (outSource.length === 4) {
          assert.deepEqual(outSource, [3, 4, 5, 6], 'should see 3-6 sequence from source')
        }
      }
    ).subscribe(subject)
  })
}

runSubjectSiblingUnsubscriptionLeakageTest()

How can we fix this issue? This brings us to one of the more important types of observables... Earlier, when we first introduced subjects, we indicated that there is a way to introduce flow-control into a stream independent of subscribe calls being propagated up. That is, there is a way to actually decouple a stream from being sensitive to downstream subscribe-calls (and unsubscribe calls) and in doing so allow us the power to decide when to open (close) the valves of a source stream independent of our downstream sinks. Any observable can be turned into one of these "flow-controllable" observables by calling the "publish" operator. When the publish method is called on any regular Observable pipeline it becomes a "ConnectableObservable, so called because upstream subscribe call propagation does not happen when a ConnectableObservable sees a subscribe call. It can only be started by calling the "connect" method. In this way subscribe calls and flow initiation are decoupled.

Let's see this operator in action:

function runConnectableCountSubscriptionsTest () {
  const connectableFromArray$ = fromArray$.publish()
  test('Internal subscribe calls for connect', assert => {
    setSubscribeProxy()
    assert.plan(3)
    assert.equal(getSubscriptionsCount(), 0, 'should have zero internal subscribe calls to start')
    connectableFromArray$.subscribe()

    assert.equal(getSubscriptionsCount(), 2, 'should have two internal subscribe before connect')
    connectableFromArray$.connect()
    assert.equal(getSubscriptionsCount(), 5, 'should have five internal subscribe propagations')
    removeSubscribeProxy()
  })
}

runConnectableCountSubscriptionsTest()

Confused? Two subscribe calls to start? Five at the end? How did that happen? ConnectableObservables are as you might expect, not like regular observables. Internally they're implemented using a Subject, which is to say when you call "publish" on a source it gets wrapped up in such a way that subscribe calls get passed to an operator called 'multicast' (which is unimportant to understand, but essentially allows source data to be transformed before it's sent through a the Subject), and is then forwarded on to a Subject, so calling publish causes one subscribe call to get passed to the multicast operator which triggers another to happen on a Subject, but that's where it ends. Critically subscribe calls are not propagated up to the original cold source. That propagation is triggered only when the "connect" method is called on the ConnectableObservable, i.e. "subscribe" calls on the source observable can only triggered via connect (which in this case causes three more subscribe calls as expected...)

On it's face, this solves the problem of subscription disposals from one sink impacting sibling sinks. Let's test that:

function runConnectableSourceToMultiSinksTest () {

  test('Decouple multiple sinks from source with ConnectableObservable', assert => {
    const connectableFromArray$ = fromArray$.publish()

    const out1 = []

    assert.plan(2)
    // Only take 2 of the 4 values before completing...
    const subscription1 = connectableFromArray$.take(2).subscribe(
      x => {
        out1.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(out1, [3, 4], 'one subscriber should only see 2 values from stream')
      }
    )

    const out2 = []

    // Take all the values coming down the pipe... now it happens as expected
    const subscription2 = connectableFromArray$.subscribe(
      x => {
        out2.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(out2, [3, 4, 5, 6], 'other subscriber should see all 4 values from stream')

      }
    )

    //Need to call "connect" to get source to initiate source start
    connectableFromArray$.connect()
  })

}

runConnectableSourceToMultiSinksTest()

So now we're in pretty good shape, we have decoupled sources from sinks, and we have the flexibility to choose if we want the source stream to restart for each subscription, or only start once no matter how many subscribers, it would seem like we're all set. We are... almost. One of the downsides of the above approach is that even if both of the sinks unsubscribe, the source will continue to emit. Remember subscribe message and unsubscribe messages do not get communicated up to the source when there is a Subject intervening (which is implicitly true after any call to "publish").

Let's show this:

function runWastefulSourceEmissionsTest () {

  test('Wasteful emissions with simple ConnectableObservable', assert => {
    // Capture all values emitted from source stream...
    const out3 = []
    const connectableFromArray$ = fromArray$.do(
      x => out3.push(x),
      undefined,
      () => {
        assert.deepEqual(out3, [3, 4, 5, 6], "All four values are emitted even though only 3 are ever relevant")
      }
    ).publish()

    const out1 = []

    assert.plan(3)
    // Only take 2 of the 4 values before completing...
    const subscription1 = connectableFromArray$.take(3).subscribe(
      x => {
        out1.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(out1, [3, 4, 5], 'one subscriber should only see 3 values from stream')
      }
    )

    const out2 = []

    // Take 1 of the values coming down the pipe...
    const subscription2 = connectableFromArray$.take(1).subscribe(
      x => {
        out2.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(out2, [3], 'other subscriber should 1 value from stream')

      }
    )

    // Need to call "connect" to get source to initiate source start,
    connectableFromArray$.connect()
  })
}

runWastefulSourceEmissionsTest()

The problem is that we didn't dispose of our subscription to the source when we did not need it anymore.

Is this a big deal? Yes. Allowing a source to continue to emit when no subscribers are listening is wasteful and in a larger application, done repeatedly can lead to performance issues. Fact is, manually starting a stream, forces us to manually stop the same stream. With that power comes responsibility, and most of the time we don't need that level of control. What we want is an an Observable with the property of not re-subscribing when a stream already has subscriber but which also unsubscribes when there are no subscribers and that's exactly what is offered by the "refCount" operator which is only available on ConnectableObservables. It's used like so: const refCountedFromArray$ = fromArray$.publish().refCount() This usage is so common RxJS actually has a shorthand for the above line.

const refCountedFromArray$ = fromArray$.share() // Synonymous with above...

This version of a ConnectableObservable reference counts subscribe calls and on the first subscription it calls "connect" for you and when the final subscriber disposes and the subject has not subscribers, a shared observable will dispose the underlying subscription for you. This means you generally don't have to manage any of the start/stop manually but you still get the benefit of a source that can be used simultaneously by multiple observers.

Let see an example (which does almost what we want...):

function runNotWastefulAnymoreSourceEmissionsTest () {

  test('Less wasteful but not yet perfect', assert => {
    // Capture all values emitted from source stream...
    const sourceOut = []
    const completeCount = 0
    const sharedFromArray$ = fromArray$.do(
      x => {
        sourceOut.push(x)
      }
    ).share()


    const out1 = []
    const out2 = []

    assert.plan(4)
    // Only take 3 of the 4 values before completing...
    const subscription1 = sharedFromArray$.take(3).subscribe(
      x => {
        out1.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(out2, [], 'this has not been filled in yet...')
        assert.deepEqual(out1, [3, 4, 5], 'this comes from first subscription...')
      }
    )

    // Take 1 of the values coming down the pipe...
    const subscription2 = sharedFromArray$.take(1).subscribe(
      x => {
        out2.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(sourceOut, [3, 4, 5, 3], 'this proves the source was started a second time...')
        assert.deepEqual(out2, [3], 'now it is filled in, comes from second subscription... what is going on??')

      }
    )
  })
}

runNotWastefulAnymoreSourceEmissionsTest()

So what's going on? Above we stated that a reference counted ConnectableObservable would give us all the good of publish (no leaking sibling subscription disposals) without all the bad (having to manually manage connecting to the source and disposing the subscription) and on some level it does, but you need to be aware of how it works so you can use it properly. The above issue happened because the source we are subscribing to is cold and outputs all it's values up-front. Our first subscription takes 3 values and then immediately disposes of the subscription, before the second subscribe call is even made. This means, the first shared observable goes from a reference count of zero to one and BACK to zero before the sibling subscription is created. Remember, a shared observable reconnects when it's subscription count goes from zero to one. This means when the second subscription request is made, a fresh subscribe call is propagated up to the source and it starts emitting from the beginning once again.

On it's face, this seems problematic and if you only ever connected to observables that emitted synchronously, it would be. Shared observables are designed to be used with asynchronous sources, i.e. sources that don't emit immediately, which gives you as the application developer the rest of the current iteration of the event loop to connect other observers and properly take advantage of the reference counting behavior. But what about scenarios where the source is generally asynchronous but does emit immediately or where you want to make sure sibling observers that subscribe on following ticks don't miss any events? Is it possible to combine the power of ReplaySubjects with a shared source? Of course the answer is yes! You could manually code it but RxJS offers a convenient shortcut with their shareReplay operator.

the shareReplay operator is used exactly like the share operator except, just like a ReplaySubject, it takes a cache size. As you would expect it caches emissions from the source and replays them to sinks that subscribe after the emissions were originally made.

function runAhhhWereDoneWithTestsTest () {

  test('Perfect...', assert => {
    // Capture all values emitted from source stream...
    const sourceOut = []
    const completeCount = 0
    const sharedFromArray$ = fromArray$.do(
      x => {
        sourceOut.push(x)
      }
    ).shareReplay(1)


    const out1 = []
    const out2 = []

    assert.plan(4)
    // Only take 3 of the 4 values before completing...
    const subscription1 = sharedFromArray$.take(3).subscribe(
      x => {
        out1.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(out2, [], 'this has not been filled in yet...')
        assert.deepEqual(out1, [3, 4, 5], 'this comes from first subscription...')
      }
    )

    //Take 1 of the values coming down the pipe...
    const subscription2 = sharedFromArray$.take(1).subscribe(
      x => {
        out2.push(x)
      },
      undefined,
      () => {
        assert.deepEqual(sourceOut, [3, 4, 5], 'this proves the source was started a second time...')
        assert.deepEqual(out2, [5], 'Ah, this is just right...')

      }
    )
  })
}

runAhhhWereDoneWithTestsTest()

Note that shareReplay is smart, so if I had asked for more than one value from the second subscription's "take" it would have triggered a second connect to the source stream but in this case it can fulfill the requested number values from it's internal cache, so it doesn't reconnect.

Share and shareReplay are the most common approaches people use to turn regular/cold observables hot (into observables that don't send a fresh subscribe call to the source subscription for each new subscriber.) Spending some time understanding how they operate will definitely help when you're seeing repeated values coming from your streams, and should help you be more productive when creating reactive applications.

@brucou
Copy link

brucou commented Jan 24, 2016

Which version of RxJS did you perform your tests with? I do not seem to reproduce the same behaviour in one case. Do you have working jsfiddle(s) for the aforedescribed study? Here is my test : http://jsfiddle.net/0pwjruL9/3/. This is one subject source with two observers, one completing after receiving two values. In this test, the subject continues to send values to the remaining observer. In a similar test, you show a different behaviour.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment