Skip to content

Instantly share code, notes, and snippets.

@wycats
Created October 22, 2013 14:07
Show Gist options
  • Save wycats/7101381 to your computer and use it in GitHub Desktop.
Save wycats/7101381 to your computer and use it in GitHub Desktop.
Connection Issues

Let's say we have a stream that represents the value of a property on some object.

var stream = new PathStream(post, 'title');

When we subscribe to this stream, we want to get its current value right away, and then any subsequent changes to the value.

In order to do this, we implement PathStream to hold onto a reference to the current value, and implement its subscribe to always emit the current value right away.

Let's say we want to map the value of that stream to an all-uppercase version of the stream, and we want the mapped stream to have the same semantics: the initial (mapped) value will be emitted when someone subscribes to the derived stream, and then they'll get more mapped values as they come.

map(stream, function(value) {
  return value.toUpperCase();
});

The problem is that map subscribes to the underlying PathStream, which immediately consumes the value, but it doesn't know to hold onto it to emit when something subscribes to the mapped stream.

You might think you can solve this by chaining a lastValue onto the end:

lastValue(map(stream, function(value) {
  return value.toUpperCase();
}));

The problem is: the map pulls the value out immediately, before lastValue has a chance to cache it.

Rx solves this problem through ConnectableObservable, which knows to avoid publishing values until the stream is actually connected.

This has several issues:

Connection is only one-level deep. If you want to map onto a connectable observable, you will need a special connectableMap that you can connect to, and which propagates the connection to its parent stream. More commonly, all of your code is in the same place, so you can connect the parent stream when you're ready. This makes decoupling more difficult, since you can no longer pass a derived stream to someone without also worrying about connecting the original stream.

Connection is per-stream, not per subscription, which makes even solutions like connectableMap problematic, because once you need to connect the stream for a particular subscriber, all of the problems come back for the next one.

A possible solution:

The crux of the issue is that chaining together operations uses the same public API (subscribe) as attempting to extract the value for some purpose.

The proposed API leaves subscribe as the chaining API, and introduces a new connect API that you use at the end of the chain to start the flow of events.

This means that all operations need to be aware of connect, so that they can propagate connect at the bottom of the chain all the way back to the original stream and cause it to send its initial value.

var title = new PathObserver(post, 'title'),
    body = new PathObserver(post, 'body');

var record = zipLatest(title, body, ([ title, body ]) => { title, body });

var rendered = map(record, function(post) {
  return html`<h1>${post.title}</h1><div>${post.body}</div>`;
});

var updateDOM = forEach(rendered, function(html) {
  // side effect
  $("#posts").html(html);
});

// Flow a request to connect all the way back up the chain, which will cause
// the two PathObservers to send their initial values, flow through zipLatest
// to convert the values into a record, map the record into HTML, and update
// the DOM.
rendered.connect();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment