Skip to content

Instantly share code, notes, and snippets.

@gugadev gugadev/observables.ts
Last active Jan 18, 2019

Embed
What would you like to do?
RxJs Observables
import chalk from 'chalk';
import {
Observable,
Observer,
Subscription,
Subject,
Subscriber,
ConnectableObservable,
BehaviorSubject,
ReplaySubject,
AsyncSubject,
from
} from 'rxjs';
import { multicast } from 'rxjs/operators';
const l = console.log;
let observable: Observable<any>;
let observer: Observer<any>;
let multicasted: ConnectableObservable<any>;
let subject: Subject<any>;
let behaviourSubject: BehaviorSubject<any>;
let replaySubject: ReplaySubject<any>;
let asyncSubject: AsyncSubject<any>;
let subscription: Subscription;
/**
* Observable - colección lazy de múltiples valores.
*
* No es una coincidencia que el callback de "create"
* sea un subscriber y que un Observable tenga un
* método "subscribe". De hecho, cuando un Observer
* se subscribe a un Observable, el "subscriber" es
* ejecutado inmediatamente.
*/
console.log(chalk.whiteBright(`
=== Observable ===
\n`));
observable = Observable.create((subscriber: Subscriber<any>) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
/**
* Observer | PartialObserver - consumidor de valores entregados
* por un Observable.
*
* Contiene tres métodos:
*
* next: (value: any) - es ejecutado cada vez que el observable
* emite un nuevo valor.
* error: (error: Error|any) - es ejecutado cuando el observable
* emite un error.
* complete: () - es ejecutado cuando el observable ejecuta "complete()".
*
* Para poder subscribirlo
* a un Observable, usamos el método "subscribe".
*/
observer = {
next: val => l(chalk.greenBright(`Siguiente valor: `), chalk.whiteBright(val)),
error: err => l(chalk.redBright(`Error: ${err}`)),
complete: () => l(chalk.whiteBright('\nYa no hay más valores'))
};
/**
* Subscription - Representa la suscripción a un Observable.
* Por medio de este objeto podemos "desubscribir"
* un Observer.
*/
subscription = observable.subscribe(observer);
setTimeout(() => {
subscription.unsubscribe();
}, 900);
/**
* Subject
*
* Es un tipo especial de Observable que permite
* que los valores sean difundidos a muchos Observers.
* Mientras que los simples son unicast, es decir,
* cada Observer suscrito posee una ejecución indepen
* diente del Observable, los Subject son multicast.
* Un Subject funciona como un EventEmitter, almace-
* nando a todos sus subscriptores y notificando a
* todos ellos.
*/
console.log(chalk.whiteBright(`\n
=== Subject ===
`));
subject = new Subject();
subject.subscribe({
next: v => l(chalk.greenBright(`\nObserver A: `, chalk.whiteBright(v)))
});
subject.subscribe({
next: v => l(chalk.greenBright(`Observer B: `), chalk.whiteBright(v))
});
subject.next('Hola');
/**
* Multicast Observable
*
* Pasa notificaciones a través de un Subject el cual puede
* tener muchos Subscriber, mientras que un "unicast" Observable
* solo envía notificaciones a un solo Observer.
*
* Internamente, los Observer se subscriben a un Subject y este
* se subscribe a un Observable.
*/
console.log(chalk.whiteBright(`\n
=== Multicast Observable ===
`));
subject = new Subject<any>();
multicasted = from([100, 200, 300]).pipe(multicast(subject)) as ConnectableObservable<any>;
multicasted.subscribe({
next: v => l(chalk.greenBright(`\nObserver C: `, chalk.whiteBright(v)))
});
multicasted.subscribe({
next: v => l(chalk.greenBright(`Observer D: `, chalk.whiteBright(v)))
});
multicasted.connect();
/**
* BehaviourSubject
*
* Una de las variantes de Subject es BehaviourSubject, el cual
* tiene la noción de "current value". Este almacena el último
* valor emitido por sus consumidores y, cuando un nuevo Obser-
* ver se subscribe, este inmediatamente recibe el "current va-
* lue" desde el BehaviourSubject.
*
* Pueden ser útiles para representar "valores sobre el tiempo".
* En instancia, un evento de stream de cumpleaños es un Subject
* pero el stream de la edad de una persona puede ser un Behavi-
* ourSubject.
*/
console.log(chalk.whiteBright(`\n
=== BehaviourSubject Observable ===
`));
behaviourSubject = new BehaviorSubject(0)
behaviourSubject.subscribe({
next: v => l(chalk.greenBright('Observer E: '), chalk.whiteBright(v))
})
behaviourSubject.next(1000)
behaviourSubject.next(2000)
behaviourSubject.subscribe({
next: v => l(chalk.greenBright('Observer F: '), chalk.whiteBright(v))
})
behaviourSubject.next(3000)
/**
* ReplaySubject
*
* Es similar a BehaviourSubject en la manera en que te envía
* el valor acutal con la diferencia que también te envía
* nuevos valores. Así mismo, puede recordar parte de la eje-
* cución de un Observable.
* El constructor de ReplaySubject acepta dos parámetros, el
* primero es obligatorio y es la cantidad de valores a reco-
* rdar y el segundo opcional, nos dice la cantidad de tiempo
* máximo de edad en milisegundos que estos valores pueden
* tener.
*/
console.log(chalk.whiteBright(`\n
=== ReplaySubject Observable ===
`));
// recuerda 2 valores para nuevos subscriptores
replaySubject = new ReplaySubject(2)
replaySubject.subscribe({
next: v => l(chalk.greenBright('Observer G: '), chalk.whiteBright(v))
})
replaySubject.next('A')
replaySubject.next('B')
replaySubject.next('C')
replaySubject.subscribe({
next: v => l(chalk.greenBright('Observer H: '), chalk.whiteBright(v))
})
subject.next('D')
/**
* AsyncSubject
*
* Es una variante donde solo el último valor de la ejecución
* del Observable es enviada a los Observer, y solo cuando la
* ejecución se completa.
*/
console.log(chalk.whiteBright(`\n
=== AsyncSubject Observable ===
`));
asyncSubject = new AsyncSubject()
asyncSubject.subscribe({
next: v => l(chalk.greenBright('Observer I: '), chalk.whiteBright(v))
})
asyncSubject.next(10)
asyncSubject.next(20)
asyncSubject.next(30)
asyncSubject.next(40)
asyncSubject.subscribe({
next: v => l(chalk.greenBright('Observer J: '), chalk.whiteBright(v))
})
asyncSubject.next(50)
asyncSubject.complete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.