Skip to content

Instantly share code, notes, and snippets.

@stvkoch
Last active October 15, 2018 16:27
Show Gist options
  • Save stvkoch/036ab7c8acb27be885af49c6046cd1d6 to your computer and use it in GitHub Desktop.
Save stvkoch/036ab7c8acb27be885af49c6046cd1d6 to your computer and use it in GitHub Desktop.
RX.js

RX.js

Terms: observable, observer, subscribe, Subject, BehaviorSubject, ReplaySubject, AsyncSubject, subscription, multicast, publish, publishReplay, publishBehavior, publishLast

Observable, Observer e Subscribe.

Observable

observable representa um streaming de valores que é emitido pelo observable ao observer.

Diferente de uma Promise, um observable pode retorna, nenhum valor, um valor, muitos valores ou mesmo infinitos valores, entretanto um observable só é executado quando passamos um observer atravez do metodo subcribe. Ao contrario de uma Promise que é executada quando criamos a promise, que então será resolvida ou rejeitada.

Observer

Observer serve para receber valores emitidos por um observable.

Um observer é um objecto que implementa a seguinte estrutura.

var observer = {
	next: function (value) {...},
	error: function (err) {...},
	complete: function () {...},
};

Subscribe

var observable = Rx.Observable.interval(1000).take(5);
var observer = {
	next: function (value) {...},
	error: function (err) {...},
	complete: function () {...},
};
observable.subscribe(observer);

Só após subscrever ao observable é que o observable é executado. E a cada subscrição do mesmo observable é criado uma nova execução. Oque faz com que não seja possivel subscrever ao mesmo observable com diferentes observer.

Subscription

O subscribe returna um objecto do tipo subscription, este objecto é possivel unsubscribe o observable e assim parar a execução do observable.

subscription objectos é um importante objecto retornado pelo subscribe, tente sempre fazer bom uso dele.

Subcrevendo multiplos observer com subject

Terms: observable, observer, subscribe, subject

Para subscrever multiplos observer a um mesmo observable é usado um subject object. A grosso modo: é o subject que é subscrito ao observable e então é possivel subscrever tantos quantos observer quiseres a este subject.

Até então tudo é bastante simples, entretanto existe diferentes tipos de subject.

subject é um tipo de objecto que implementa tanto um observable quanto um observer. Ou seja é possivel subcrever observers e emitir valores.

var subject = new  Rx.Subject();
var observer = {
	next: function (x) { console.log('Value from subject ' + x) },
	error: function (err) { console.log('Error from subject ' + err) },
	complete: function () { console.log('Subject is done') },
};
subject.subscribe(observer);

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();

Subject

Subject é um tipo bastante simple de Observable no qual permite que tenhamos mais de um observer por observable.

var observable = Rx.Observable.interval(1000).take(5);
var subject = new  Rx.Subject();
var observerAsFromZero = {
	next: function (x) { console.log('Next ' + x) },
	error: function (err) { console.log('Error ' + err) },
	complete: function () { console.log('Done') },
};
observable.subscribe(subject);
// ...
subject.subscribe(observer);

var observerAsFromTwo = {
	next: function (x) { console.log('Next ' + x) },
	error: function (err) { console.log('Error ' + err) },
	complete: function () { console.log('Done') },
};
setTimeout(function () {
	subject.subscribe(observerAsFromTwo);
}, 2000);

Mas existe casos em que deseja que o subject tenha outros comportamentos, por exemplo, guarde os valorers emitidos anteriores pelo observable e depois reenvie para novos observer subscritos depois que estes valores foram emitidos.

Este é um caso para outros tipos de subject.

BehaviorSubject

Terms: BehaviorSubject, subject, observer, observable, subscribe

É um tipo de subject que irá enviar ao observer subscrito o último valor recebido, mesmo que este valor tenha sido enviado muito antes do observer ser subscribe.

ReplaySubject

Terms: ReplaySubject, subject, observer, subscribe

É um tipo de subject que irá manter os ultimos X values para que seja reenviado ao observer subscrito.

O buffer é o valor passado ao criar um ReplaySubject que diz o tamanho maximo que o array de valores/objectos/qualquer coisa deve ter, em outras palavras é tamanho do espaço de buffer que o ReplaySubject devera ter.

AsyncSubject

Terms: BehaviorSubject, AsyncSubject, observable

AsyncSubject é muito parecido com BehaviorSubject com a diferença que irá emitir o ultimo valor somente depois de completado o observable.

Multicast

Como sabes subscrever a um observable usando um subject ou observer faz com que o observable comece a ser executado.

observable.subscribe(subject);

Apesar de subscribe dar o controle que geralmente precisas, é mais elegante usar um operador RX para que conecte o subject ao observable, este operador é multicast.

O multicast ira retornar um objecto do tipo ConnectableObservable.

var connectedObs = Rx.Observable
	.interval(1000)
	.take(5)
	.multicast(new Rx.Subject());
	
var observer = {
	next: function (value) {...},
	error: function (err) {...},
	complete: function () {...},
};
connectedObs.subscribe(observer);
connectedObs.connect()

O metodo connect irá literalmente subscrever usando subscribe o Subject passado ao multicast, e assim iniciar a execução do observable e retorna um objecto do tipo subscription.

Iniciando e parando ConnectableObservable automaticamente.

Existe um operador de ConnectableObservable que ira connectar ao observable quando recever o primeiro subscribe e ira parar (unsubscribe) quando a ultima Subscription for unsubscribe.

var observable = Rx.Observable
	.interval(1000)
	.multicast(new Rx.Subject())
	.refCount();
	
var observer = {
	next: function (value) {...},
	error: function (err) {...},
	complete: function () {...},
};
var started = observable.subscribe(observer);
started.unsubscribe();

Atalhos para criar multicast

Use o metodo publish de um observable para automaticamente criar multicast.

Automaticamente criar um multicast de Subject

var observable = Rx.Observable
	.interval(1000)
	.multicast(new Rx.Subject());

é igual a:

var observable = Rx.Observable
	.interval(1000)
	.publish();

Para cada tipo de subject existe um metodo publish apropriado:

  • .publish() -> .multicast(new Rx.Subject())
  • .publishReplay(bufferSize) -> .multicast(new Rx.ReplaySubject(bufferSize))
  • .publishBehavior('initialValue') -> .multicast(new Rx.BehaviorSubject('initialValue'))
  • .publishLast() -> .multicast(new Rx.AsyncSubject())

Reusar multicasts

Quando um subject é completo (complete), este subject não emite mais valores, entretanto ha casos que quer que seja emitido novamente valores quando um multicast que ja terminou receba uma nova subscrição.

Nestes casos usamos uma 'SubjetFactory' function:

var subjectFactory = function() {
	return new Rx.Subject();
}
var shared = Rx.Observable
                   .interval(1000)
                   .take(5)
                   .multicast(subjectFactory)
                   .refCount();


var observerAsFromZero = {
	next: function(a){console.log('A -> ' + a);},
	error: function(e){console.log('Error A -> ' + e);},
	complete: function(){console.log('Done A');}
};
console.log('subscribe shared with A');
shared.subscribe(observerAsFromZero);


var observerAsFromTwo = {
	next: function(a){console.log('B -> ' + a);},
	error: function(e){console.log('Error B -> ' + e);},
	complete: function(){console.log('Done B');}
};

setTimeout(function () {
  console.log('subscribe shared with B');
  shared.subscribe(observerAsFromTwo);
}, 2000);

setTimeout(function () {
  console.log('subscribe shared with A again');
  shared.subscribe(observerAsFromZero);
}, 6000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment