Skip to content

Instantly share code, notes, and snippets.

@gajus
Last active May 18, 2017 14:50
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gajus/584a328058e05a662c01 to your computer and use it in GitHub Desktop.
Save gajus/584a328058e05a662c01 to your computer and use it in GitHub Desktop.

http://jhusain.github.io/learnrx/

Like an Event, an Observable is a sequence of values that a data producer pushes to the consumer. However unlike an Event, an Observable can signal to a listener that it has completed, and will send no more data.

Querying Arrays only gives us a snapshot. By contrast, querying Observables allows us to create data sets that react and update as the system changes over time. This enables a very powerful type of programming known as reactive programming.

Disposing of a Subscription object unsubscribes from the event and prevents memory leaks. Disposing of a subscription is the asynchronous equivalent of stopping half-way through a counting for loop.

If we convert Events to Observable Objects, we can use powerful functions to transform them.

The take() function creates a new sequence that completes after a discrete number of items arrive. When an Observable sequence completes it unsubscribes all of its listeners.

Observable's takeUntil() function is a convenient way of completing a sequence when another Event occurs.

  • We can traverse Observables using forEach().
  • We can use fromEvent() to convert Events into Observables that never complete.
  • We can apply take() and takeUntil() to an Observable to create a new sequence which does complete.

Observable.create() is powerful enough to convert any asynchronous API into an Observable. Observable.create() relies on the fact that all asynchronous APIs have the following semantics:

  1. The client needs to be able to receive data.
  2. The client needs to be able to receive error information.
  3. The client needs to be able to be alerted that the operation is complete.
  4. The client needs to be able to indicate that they're no longer interested the result of the operation.

The function passed to Observable.create() is the definition of the forEach() function for this Observable. The function we pass to Observable.create() accepts only one value: an Observer. An Observer is just a triple containing three handlers:

  • The onNext() handler used to send data to the client.
  • The onError() handler used to send error information to the client.
  • The onCompleted() handler used to inform the client that the sequence has completed.

Observable.create() returns a function that defines the dispose() method of the Subscription object during Traversal. Like the Observer, the Observable.create() function creates the Subscription object for us and uses our function as the dispose() definition.

var getJSON,
    subscription;

getJSON = function (url) {
    return Observable.create(function (observer) {
        var subscribed = true;

        $.getJSON(url, {
                success:
                    function (data) {
                        // If client is still interested in the results, send them.
                        if (subscribed) {
                            // Send data to the client
                            observer.onNext(data);
                            // Immediately complete the sequence
                            observer.onCompleted();
                        }
                    },
                error: function (ex) {
                    // If client is still interested in the results, send them.
                    if (subscribed) {
                        // Inform the client that an error occurred.
                        observer.onError(ex);
                    }
                }
            });

        // Definition of the Subscription objects dispose() method.
        return function () {
            subscribed = false;
        };
    });
};

subscription = getJSON('http://api-global.netflix.com/abTestInformation')
    .forEach(
        // observer.onNext()
        function (data) {
            alert(JSON.stringify(data));
        },
        // observer.onError()
        function (err) {
            alert(err);
        },
        // observer.onCompleted()
        function () {
            alert('The asynchronous operation has completed.');
        });

FRP

https://medium.com/@andrestaltz/why-i-cannot-say-frp-but-i-just-did-d5ffaa23973b

Functional

  • One key feature in a functional language is the concept of first-class functions. The idea is that you can pass functions as parameters to other functions and return them as values.
  • Functional programming involves writing code that does not change state.

Reactive

  • The idea of streams of events which can be observed or can react to other event streams.

FRP

  • "Original FRP" refers to denotational and continuous-time functional programming using behaviors and events.
  • "Original FRP" has been replaced with "denotative, continuous-time programming" (DCTP).
  • Popular "FRP" use refers to a listenable and functionally-composable event streams.

Reactive Programming

https://gist.github.com/staltz/868e7e9bc2a7b8c1f754

Reactive programming is programming with asynchronous data streams.

In a way, this isn't anything new. Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some side effects. Reactive is that idea on steroids. You are able to create data streams of anything, not just from click and hover events. Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc. For example, imagine your Twitter feed would be a data stream in the same fashion that click events are. You can listen to that stream and react accordingly.

The "listening" to the stream is called subscribing. The functions we are defining are observers. The stream is the subject (or "observable") being observed. This is precisely the Observer Design Pattern.

  • Example of using buffer to capture series of click events.

A Promise is simply an Observable with one single emitted value. Rx streams go beyond promises by allowing many returned values.

A metastream is a stream where each emitted value is yet another stream. [flatmap()] is a version of [map()] that "flattens" a metastream, by emitting on the "trunk" stream everything that will be emitted on "branch" streams.

Marbles

http://rxmarbles.com/

Cold vs. Hot Observables

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-vs-hot-observables

Cold observables start running upon subscription, i.e., the observable sequence only starts pushing values to the observers when Subscribe is called. Values are also not shared among subscribers.

var source = Rx.Observable.interval(1000);

var subscription1 = source.subscribe(
  function (x) { console.log('Observer 1: onNext: ' + x); },
  function (e) { console.log('Observer 1: onError: ' + e.message); },
  function () { console.log('Observer 1: onCompleted'); });

var subscription2 = source.subscribe(
  function (x) { console.log('Observer 2: onNext: ' + x); },
  function (e) { console.log('Observer 2: onError: ' + e.message); },
  function () { console.log('Observer 2: onCompleted'); });

setTimeout(function () {
  subscription1.dispose();
  subscription2.dispose();
}, 5000);

// => Observer 1: onNext: 0
// => Observer 2: onNext: 0
// => Observer 1: onNext: 1
// => Observer 2: onNext: 1
// => Observer 1: onNext: 2
// => Observer 2: onNext: 2
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3

This is different from hot observables such as mouse move events or stock tickers which are already producing values even before a subscription is active. When an observer subscribes to a hot observable sequence, it will get all values in the stream that are emitted after it subscribes. The hot observable sequence is shared among all subscribers, and each subscriber is pushed the next value in the sequence. For example, even if no one has subscribed to a particular stock ticker, the ticker will continue to update its value based on market movement. When a subscriber registers interest in this ticker, it will automatically receive the next tick.

console.log('Current time: ' + Date.now());

// Creates a sequence
var source = Rx.Observable.interval(1000);

// Convert the sequence into a hot sequence
var hot = source.publish();

// No value is pushed to 1st subscription at this point
var subscription1 = hot.subscribe(
  function (x) { console.log('Observer 1: onNext: %s', x); },
  function (e) { console.log('Observer 1: onError: %s', e); },
  function () { console.log('Observer 1: onCompleted'); });

console.log('Current Time after 1st subscription: ' + Date.now());

// Idle for 3 seconds
setTimeout(function () {

  // Hot is connected to source and starts pushing value to subscribers
  hot.connect();

  console.log('Current Time after connect: ' + Date.now());

  // Idle for another 3 seconds
  setTimeout(function () {

    console.log('Current Time after 2nd subscription: ' + Date.now());

    var subscription2 = hot.subscribe(
      function (x) { console.log('Observer 2: onNext: %s', x); },
      function (e) { console.log('Observer 2: onError: %s', e); },
      function () { console.log('Observer 2: onCompleted'); });

  }, 3000);
}, 3000);

// => Current time: 1382562433256
// => Current Time after 1st subscription: 1382562433260
// => Current Time after connect: 1382562436261
// => Observer 1: onNext: 0
// => Observer 1: onNext: 1
// => Current Time after 2nd subscription: 1382562439262
// => Observer 1: onNext: 2
// => Observer 2: onNext: 2
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3
// => Observer 1: onNext: 4
// => Observer 2: onNext: 4

Yarr

https://github.com/channikhabra/yarr/blob/master/complete-tutorial.org

Observable.prototype.flatMap

flatMap is like combination of two operators: map and flatten. It maps a function over the source Observable, and flatten the result, i.e it assume an Observable to be returned from the function, and it returns the value of that observable. It can also resolve Promise and generators to values, like in the above example.

----val----val----val----val---------|

-----------------map-----------------

----Obs----Obs----Obs----Obs---------|

----------------flat-----------------

--newVal--newVal--newVal--newVal-----|

Remeber ...

  • You should use reduce if you want the accumulation to happen when posts Observable completes
  • And you should use scan if you are not sure if the Observable will complete, and you want intermediate accumulations
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment