Skip to content

Instantly share code, notes, and snippets.

@ZenabKhan
Last active March 1, 2017 00:57
Show Gist options
  • Save ZenabKhan/b6cb91ecbce7f1a1aa5ab57f408f08bc to your computer and use it in GitHub Desktop.
Save ZenabKhan/b6cb91ecbce7f1a1aa5ab57f408f08bc to your computer and use it in GitHub Desktop.

Introduction to RxJS Observables: Creating Observables

Intro to RxJS

RxJS stands for Reactive Extension for JavaScript. It’s a library to work with asynchronous data streams or observables. These data streams can be created from many things. For example, from UI events, Http requests, File systems, Array-like objects or Memory/Cache. A Stream is a sequence of ongoing events ordered in time. It can emit value, error or completed signal. To work with these streams, we need to observe what they emit. Here comes the role of RxJS. We can work with these streams along with RxJS Observables.

What are Observables?

Observables are like watchers on the stream and and they emit a function when stream returns a value, error or a signal. These can be subscribed with observers. Observables are constantly watching the data stream. It comes with a bunch of operators which can allow us to filter, compose, select, combine and transform observables.

Create Observable

I am not adding the steps to setup RxJS environment. You can follow the steps from this link RxJS installation guide or you can use this boilerplate to get started. Observables can be created in different ways. I will cover basic implementation of each type.

Creating observable with event

When we create an observable from an event, it listens to that event. We use fromEvent operator for this. Suppose we have a click button and we have to observe click event.

<!--index.html-->
...
<button id="btn">Click</button>
...
//app.js
import Rx from 'rxjs/Rx';
 
const btn = $('#btn');
//Creating onservable
const btnStream$ = Rx.Observable.fromEvent(btn, 'click')
    .subscribe((event) => { // subscribing to event emitter
        console.log('Button clicked');
    },
    (error) => { //error callback
        console.log(error);
    },
    () => { //callback to siganl event completed
        console.log('completed');
    });

//Output after clicking the button
//Button clicked

In the above code I have created a variable btnStream$ for observable. Notice the $ sign at the end. It is not a must to add, but thats a good practice so we can know which of our variables are actual observable.

Now lets add another event

<!--index.html-->
...
<input id="input" type="text">
...
<button>Click me</button>
...
//app.js
import Rx from 'rxjs/Rx';
 
const btn = $('#btn');
const input = $('#input');
//Creating onservable
const btnStream$ = Rx.Observable.fromEvent(btn, 'click')
    .subscribe((event) => { // subscribing to event emitter
        console.log('Clicked');
    },
    (error) => { //error callback
        console.log(error);
    },
    () => { //callback to siganl event completed
        console.log('completed');
    });
 
const inputStream$ = Rx.Observable.fromEvent(input, 'keyup')
    .subscribe((event) => { // subscribing to event emitter
        console.log(event.target.value); //logging the current value of input
    });

// Output after clicking the button
// Clicked

// Output after entering value eg 'hello' in the input
// h
// he
// hel
// hell
// hello

If you see the above code I haven’t given the second and third callback while subscribing to input observable. This is because only the first call-back is needed and other two in the subscribe function are optional.

Also when you run this code you will observe complete callback is not called since the event is and ongoing process and not ended.

Creating observable with data structures

We can also create observable with data structures like array, map, set etc. We simply need to use from operator instead of fromEvent. Example:

//app.js

import Rx from 'rxjs/Rx';

const array = [1, 2, 3, 4, 5];
const arrayStream$ = Rx.Observable.from(array)
    .subscribe((element) => { // subscribing to array
        console.log(element);
    },
    (error) => { //error callback
        console.log(error);
    },
    () => { //callback to siganl event completed
        console.log('completed');
    });

// Output
// 1
// 2
// 3
// 4
// 5
// completed

Notice we get the completed log with this one. This is because it is a static array and when it gets completed when it gets to the end.

Lets try it out with Set as well:

//app.js

import Rx from 'rxjs/Rx';

const set = new Set(['string', 25, {text: 'some text'}]);
const setStream$ = Rx.Observable.from(set)
    .subscribe((element) => { // subscribing to set
        console.log(element);
    },
    (error) => { //error callback
        console.log(error);
    },
    () => { //callback to siganl event completed
        console.log('completed');
    });

// Output
// string
// 25
// {text: 'some text'}
// completed

Creating observable from scratch

There are many cases when we want our own custom observable. We can do it by creating and configuring observable our self.

So to create custom observable:

import Rx from 'rxjs/Rx';

const source$ = Rx.Observable(observer => {
    console.log('Creating observable');
});

source$.subscribe(
    (x) => { // subscribing
        console.log(x);
    },
    (error) => { //error callback
        console.log(error);
    },
    () => { //callback to siganl completed
        console.log('completed');
    });

// Output
// Creating observable

In the above example none of the subscribe callbacks will be invoked. We need to configure that explicitly now.

Lets update the code:

import Rx from 'rxjs/Rx';

const source$ = Rx.Observable(observer => {
    console.log('Creating observable');

    observer.netx('Hello'); //emits value
    observer.complete(); //need to tell when the observer is complete
});

source$.subscribe(
    (x) => { // subscribing
        console.log(x);
    },
    (error) => { //error callback
        console.log(error);
    },
    () => { //callback to siganl completed
        console.log('completed');
    });

// Output
// Creating observable
// Hello completed
// 

Creating from promise

We can also create observables from promise. So it watches over when the promise is resolved. For creating observable we will use fromPromise operator. We will create a promise and then subscribe to it:

import Rx from 'rxjs/Rx';

//creating a promise
const myPromise = new Promise(resolve, reject) => {
    console.log('creating promise');
    setTimeout(() => { // just to make it a dummy promise
        resolve('resolving promise');
    }, 3000); //3 sec timeout
});

const myPromiseStream$ = Rx.Observable.fromPromise(myPromise)
    .subscribe(
    (x) => { // subscribing
        console.log(x);
    },
    (error) => { //error callback
        console.log(error);
    },
    () => { //callback to siganl completed
        console.log('completed');
    });

// Output
// creating promise
// resolving promise

Lets do a bit more practical example.

import Rx from 'rxjs/Rx';

//function that get username and fetch data from Github and returns as a promise.
function getUser(username){
    return $.ajax({
        url: 'https://api.github.com/users/'+username,
        dataType: 'jsonp'
    }).promise();
}

Rx.Observable.fromPromise(getUser('ZenabKhan'))
    .subscribe(
    (x) => { // subscribing
        console.log(x.data.name); //data received from Github
    });

// Output
// Zenab Saleem

So we created a promise that fetches Github user's data and we subscribe to it via observable. Then when the data is available to us, we log the data. In the above case user's name.

That's it for now. Hope it was easy to grasp. Please do give your feedback or suggestions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment