Skip to content

Instantly share code, notes, and snippets.

@mokargas
Last active March 14, 2017 03:30
Show Gist options
  • Save mokargas/f1de5bfb923d7454dfd9c2cf3077fc52 to your computer and use it in GitHub Desktop.
Save mokargas/f1de5bfb923d7454dfd9c2cf3077fc52 to your computer and use it in GitHub Desktop.
Rx.js Notes

Hot takes:

  • Consider 'RxJS' as being Lodash for events
  • Combines the Observer Pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.

What is the Observer pattern

From Wikipedia: "The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods."

We can mock this in JS in a pseudo-codey way


//Some Dependent object, class, function
function Dependent(name, message) {
  this.name = name
  this.message = message
  this.update = (message) => {
    this.message = message
    console.log(message, ''+this.name)
  }
}

//The Subject object maintains a list of dependents.
let Subject = {
  dependents: []
}

Subject.subscribe = (dependent) => {
  Subject.dependents.push(dependent)
}

Subject.unsubscribe = (dependent) => {
  let dependents = Subject.dependents.filter(item => item === dependent)
}

//Some sort of update action
Subject.update = (message) => {

  //Notify dependents
  Subject.dependents.forEach(item => item.update(message))
}

//Make some dependents and subscribe to Subject updates
let john = new Dependent('John', 'I love purple')
Subject.subscribe(john)

let fred = new Dependent('Fred', 'I love green')
Subject.subscribe(fred)

//Update the subject
Subject.update('I command you')

What is the Iterator pattern

From Wikipedia: "In object-oriented programming, the iterator pattern is a design pattern in which an iterator is used to traverse a container and access the container's elements. The iterator pattern decouples algorithms from containers; in some cases, algorithms are necessarily container-specific and thus cannot be decoupled."

Basically this means a way to access the elements (or properties) of an aggregate object sequentially without exposing or dealing with the underlying structure.


//In ES6 we use a for-of loop as our basic introduction

for (let key of someObject){
  console.log(key + ' = ' + someObject[key]);
}

//Note there's no exposing of the structure. I could do this:
for (let key of table){
  console.log(key + ' = ' + table[key]);
}

//The results are largely dependent on Iterator implementations . Symbol.iterator
See here: https://strongloop.com/strongblog/introduction-to-es6-iterators/ for more depth

Basic Rx.js Examples

//Instead of your basic event listeners:

let button =  document.querySelector('button')
Rx.Observable.fromEvent(button, click).subscribe(()=> console.log('clicked'));

Purity

Rx aims to hinge on pure functions. This enables less errors. Idea here is to isolate state.

//Ye olde-school. State (count) is mutated
var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));


//Same idea as before, but with pure functions. Note this isolates state away
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`))

//Scan works like reduce. Takes value and exposes to callback. Return value becomes the next value exposed on the next time the callback is executed

Flow

Rx contains operators to control even flow through Observables.

//Example from Rx docs
let count = 0
let rate = 1000
let lastClick = Date.now() - rate

const button = document.querySelector('button')
button.addEventListener('click', () =>{
    if(Date.now() - lastClick >= rate){
      console.log(`Cliced ${++count} times`);
      lastClick = Date.now()
    }
  })


//Using Rx operators

const button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .scan(count => count + 1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

Values

Values can be transformed passing through observables


//Example from RX docs
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate) {
    count += event.clientX;
    console.log(count)
    lastClick = Date.now();
  }
});

//Rx version
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .map(event => event.clientX)
  .scan((count, clientX) => count + clientX, 0)
  .subscribe(count => console.log(count));

Observable

  • "Observables are lazy Push collections of multiple values."
  • "Subscribing to an Observable is analogous to calling a Function."
  • "Observables are able to deliver values either synchronously or asynchronously."
  • "What is the difference between an Observable and a function? Observables can "return" multiple values over time, something which functions cannot."
let observable = Rx.Observable.create((observer)=>{
  observer.next(1)
  observer.next(2)
  observer.next(3)

  //Lazy push another after a second
  setTimeout(() => {
     observer.next(4);
     observer.complete();
   }, 1000);
 });

 //Then we have to subscribe to get updates:
console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

Examples for the difference between Observables and regular functions

//A regular function returns one value over time. Note return statements
function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

//But an Observable can do this:
var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200); // "return" yet another
  observer.next(230); // "return" yet another
});


//Synchronous output
console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

//Or we can return multiple values asynchronously. Note timeout
var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300); // happens asynchronously
  }, 1000);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

  • func.call() means "give me one value synchronously"
  • observable.subscribe() means "give me any amount of values, either synchronously or asynchronously"

Main Observable Concerns

  • Creating observables, via Rx.Observable.create or a creation operator
  • Subscribed to with an Observer of some kind
  • Executing the Observable, to deliver 'next' / 'error' / 'complete' notifications to the Observer
  • Disposing of the Observables

Creating Observables

  • Rx.Observable.create is an alias, for the Observable constructor and takes one argument (subscribe function)
  • Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc.
The following example creates an Observable to emit the string 'hi' every second to an Observer.

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

Subscribing to Observables

  • An observable can be subscribed to like observable.subscribe(x => console.log(x));
  • Subscribing to an Observable is like calling a function while providing callbacks to where the data will be delivered to
  • observable.subscribe and subscribe in Observable.create(subscribe(observer)) are the same name and are conceptually equivalent
  • Very different to addEventListener/removeEventListener. Observer is not registered as a listener
  • The Observable does not maintain a list of attached Observers (??)
  • A subscribe call is simply a way to start Observable execution and deliver values or events to an Observer of that execution

Executing Observables

  • Observable.create(function subscribe(observer) {...}) represents an Observable Execution
  • A lazy computation that only happens for each Observer that subscribes.
  • Execution produces multiple values over time, either synchronously or asynchronously

Three types of values an Observable Execution can deliver:

  • Next notification: sends a value such as a primitive or object (most important type) Infinite or zero next notifications may be delivered
  • Error notification: JS error or exception
  • Complete: does not send any value

If either an error or complete notification is delivered, nothing else can be delivered after.

Examples:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

//Won't return observer.next(4)
var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // Is not delivered because it would violate the contract
});

Note: Recommended to wrap any code within the subscribe function with a try-catch block

Disposing Observable Executions

  • OE's can be infinite and it's common for an Observer to want to abort execution.
  • Each execution is exclusive to one Observe only.
  • Need a way to cancel to stop the OE to avoid waste (This is the opposite of Promises which have no true cancel api)
  • When observable.susbcribe is called, the Observer gets attached to the newly created Observable execution.
  • This actually returns an object called the Subscription e.g let Subscription = observable.subscribe(x => console.log(x))
  • the Subscription represents continuing execution.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment