RxJS stands for Reactive Extension for JavaScript. It’s a library to work with asynchronous data streams or observables. These data streams can be created from many things. For example, from UI events, Http requests, File systems, Array-like objects or Memory/Cache. A Stream is a sequence of ongoing events ordered in time. It can emit value, error or completed signal. To work with these streams, we need to observe what they emit. Here comes the role of RxJS. We can work with these streams along with RxJS Observables.
Observables are like watchers on the stream and and they emit a function when stream returns a value, error or a signal. These can be subscribed with observers. Observables are constantly watching the data stream. It comes with a bunch of operators which can allow us to filter, compose, select, combine and transform observables.
I am not adding the steps to setup RxJS environment. You can follow the steps from this link RxJS installation guide or you can use this boilerplate to get started. Observables can be created in different ways. I will cover basic implementation of each type.
When we create an observable from an event, it listens to that event. We use fromEvent
operator for this. Suppose we have a click button and we have to observe click event.
<!--index.html-->
...
<button id="btn">Click</button>
...
//app.js
import Rx from 'rxjs/Rx';
const btn = $('#btn');
//Creating onservable
const btnStream$ = Rx.Observable.fromEvent(btn, 'click')
.subscribe((event) => { // subscribing to event emitter
console.log('Button clicked');
},
(error) => { //error callback
console.log(error);
},
() => { //callback to siganl event completed
console.log('completed');
});
//Output after clicking the button
//Button clicked
In the above code I have created a variable
btnStream$
for observable. Notice the$
sign at the end. It is not a must to add, but thats a good practice so we can know which of our variables are actual observable.
Now lets add another event
<!--index.html-->
...
<input id="input" type="text">
...
<button>Click me</button>
...
//app.js
import Rx from 'rxjs/Rx';
const btn = $('#btn');
const input = $('#input');
//Creating onservable
const btnStream$ = Rx.Observable.fromEvent(btn, 'click')
.subscribe((event) => { // subscribing to event emitter
console.log('Clicked');
},
(error) => { //error callback
console.log(error);
},
() => { //callback to siganl event completed
console.log('completed');
});
const inputStream$ = Rx.Observable.fromEvent(input, 'keyup')
.subscribe((event) => { // subscribing to event emitter
console.log(event.target.value); //logging the current value of input
});
// Output after clicking the button
// Clicked
// Output after entering value eg 'hello' in the input
// h
// he
// hel
// hell
// hello
If you see the above code I haven’t given the second and third callback while subscribing to input observable. This is because only the first call-back is needed and other two in the subscribe function are optional.
Also when you run this code you will observe complete callback is not called since the event is and ongoing process and not ended.
We can also create observable with data structures like array, map, set etc. We simply need to use from
operator instead of fromEvent
. Example:
//app.js
import Rx from 'rxjs/Rx';
const array = [1, 2, 3, 4, 5];
const arrayStream$ = Rx.Observable.from(array)
.subscribe((element) => { // subscribing to array
console.log(element);
},
(error) => { //error callback
console.log(error);
},
() => { //callback to siganl event completed
console.log('completed');
});
// Output
// 1
// 2
// 3
// 4
// 5
// completed
Notice we get the completed log with this one. This is because it is a static array and when it gets completed when it gets to the end.
Lets try it out with Set as well:
//app.js
import Rx from 'rxjs/Rx';
const set = new Set(['string', 25, {text: 'some text'}]);
const setStream$ = Rx.Observable.from(set)
.subscribe((element) => { // subscribing to set
console.log(element);
},
(error) => { //error callback
console.log(error);
},
() => { //callback to siganl event completed
console.log('completed');
});
// Output
// string
// 25
// {text: 'some text'}
// completed
There are many cases when we want our own custom observable. We can do it by creating and configuring observable our self.
So to create custom observable:
import Rx from 'rxjs/Rx';
const source$ = Rx.Observable(observer => {
console.log('Creating observable');
});
source$.subscribe(
(x) => { // subscribing
console.log(x);
},
(error) => { //error callback
console.log(error);
},
() => { //callback to siganl completed
console.log('completed');
});
// Output
// Creating observable
In the above example none of the subscribe callbacks will be invoked. We need to configure that explicitly now.
Lets update the code:
import Rx from 'rxjs/Rx';
const source$ = Rx.Observable(observer => {
console.log('Creating observable');
observer.netx('Hello'); //emits value
observer.complete(); //need to tell when the observer is complete
});
source$.subscribe(
(x) => { // subscribing
console.log(x);
},
(error) => { //error callback
console.log(error);
},
() => { //callback to siganl completed
console.log('completed');
});
// Output
// Creating observable
// Hello completed
//
We can also create observables from promise. So it watches over when the promise is resolved. For creating observable we will use fromPromise
operator.
We will create a promise and then subscribe to it:
import Rx from 'rxjs/Rx';
//creating a promise
const myPromise = new Promise(resolve, reject) => {
console.log('creating promise');
setTimeout(() => { // just to make it a dummy promise
resolve('resolving promise');
}, 3000); //3 sec timeout
});
const myPromiseStream$ = Rx.Observable.fromPromise(myPromise)
.subscribe(
(x) => { // subscribing
console.log(x);
},
(error) => { //error callback
console.log(error);
},
() => { //callback to siganl completed
console.log('completed');
});
// Output
// creating promise
// resolving promise
Lets do a bit more practical example.
import Rx from 'rxjs/Rx';
//function that get username and fetch data from Github and returns as a promise.
function getUser(username){
return $.ajax({
url: 'https://api.github.com/users/'+username,
dataType: 'jsonp'
}).promise();
}
Rx.Observable.fromPromise(getUser('ZenabKhan'))
.subscribe(
(x) => { // subscribing
console.log(x.data.name); //data received from Github
});
// Output
// Zenab Saleem
So we created a promise that fetches Github user's data and we subscribe to it via observable. Then when the data is available to us, we log the data. In the above case user's name.
That's it for now. Hope it was easy to grasp. Please do give your feedback or suggestions.