Terms: observable, observer, subscribe, Subject, BehaviorSubject, ReplaySubject, AsyncSubject, subscription, multicast, publish, publishReplay, publishBehavior, publishLast
observable representa um streaming de valores que é emitido pelo observable ao observer.
Diferente de uma Promise, um observable pode retorna, nenhum valor, um valor, muitos valores ou mesmo infinitos valores, entretanto um observable só é executado quando passamos um observer atravez do metodo subcribe. Ao contrario de uma Promise que é executada quando criamos a promise, que então será resolvida ou rejeitada.
Observer serve para receber valores emitidos por um observable.
Um observer é um objecto que implementa a seguinte estrutura.
var observer = {
next: function (value) {...},
error: function (err) {...},
complete: function () {...},
};
var observable = Rx.Observable.interval(1000).take(5);
var observer = {
next: function (value) {...},
error: function (err) {...},
complete: function () {...},
};
observable.subscribe(observer);
Só após subscrever ao observable é que o observable é executado. E a cada subscrição do mesmo observable é criado uma nova execução. Oque faz com que não seja possivel subscrever ao mesmo observable com diferentes observer.
O subscribe returna um objecto do tipo subscription, este objecto é possivel unsubscribe o observable e assim parar a execução do observable.
subscription objectos é um importante objecto retornado pelo subscribe, tente sempre fazer bom uso dele.
Terms: observable, observer, subscribe, subject
Para subscrever multiplos observer a um mesmo observable é usado um subject object. A grosso modo: é o subject que é subscrito ao observable e então é possivel subscrever tantos quantos observer quiseres a este subject.
Até então tudo é bastante simples, entretanto existe diferentes tipos de subject.
subject é um tipo de objecto que implementa tanto um observable quanto um observer. Ou seja é possivel subcrever observers e emitir valores.
var subject = new Rx.Subject();
var observer = {
next: function (x) { console.log('Value from subject ' + x) },
error: function (err) { console.log('Error from subject ' + err) },
complete: function () { console.log('Subject is done') },
};
subject.subscribe(observer);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
Subject é um tipo bastante simple de Observable no qual permite que tenhamos mais de um observer por observable.
var observable = Rx.Observable.interval(1000).take(5);
var subject = new Rx.Subject();
var observerAsFromZero = {
next: function (x) { console.log('Next ' + x) },
error: function (err) { console.log('Error ' + err) },
complete: function () { console.log('Done') },
};
observable.subscribe(subject);
// ...
subject.subscribe(observer);
var observerAsFromTwo = {
next: function (x) { console.log('Next ' + x) },
error: function (err) { console.log('Error ' + err) },
complete: function () { console.log('Done') },
};
setTimeout(function () {
subject.subscribe(observerAsFromTwo);
}, 2000);
Mas existe casos em que deseja que o subject tenha outros comportamentos, por exemplo, guarde os valorers emitidos anteriores pelo observable e depois reenvie para novos observer subscritos depois que estes valores foram emitidos.
Este é um caso para outros tipos de subject.
Terms: BehaviorSubject, subject, observer, observable, subscribe
É um tipo de subject que irá enviar ao observer subscrito o último valor recebido, mesmo que este valor tenha sido enviado muito antes do observer ser subscribe.
Terms: ReplaySubject, subject, observer, subscribe
É um tipo de subject que irá manter os ultimos X values para que seja reenviado ao observer subscrito.
O buffer é o valor passado ao criar um ReplaySubject que diz o tamanho maximo que o array de valores/objectos/qualquer coisa deve ter, em outras palavras é tamanho do espaço de buffer que o ReplaySubject devera ter.
Terms: BehaviorSubject, AsyncSubject, observable
AsyncSubject é muito parecido com BehaviorSubject com a diferença que irá emitir o ultimo valor somente depois de completado o observable.
Como sabes subscrever a um observable usando um subject ou observer faz com que o observable comece a ser executado.
observable.subscribe(subject);
Apesar de subscribe dar o controle que geralmente precisas, é mais elegante usar um operador RX para que conecte o subject ao observable, este operador é multicast.
O multicast ira retornar um objecto do tipo ConnectableObservable.
var connectedObs = Rx.Observable
.interval(1000)
.take(5)
.multicast(new Rx.Subject());
var observer = {
next: function (value) {...},
error: function (err) {...},
complete: function () {...},
};
connectedObs.subscribe(observer);
connectedObs.connect()
O metodo connect irá literalmente subscrever usando subscribe o Subject passado ao multicast, e assim iniciar a execução do observable e retorna um objecto do tipo subscription.
Existe um operador de ConnectableObservable que ira connectar ao observable quando recever o primeiro subscribe e ira parar (unsubscribe) quando a ultima Subscription for unsubscribe.
var observable = Rx.Observable
.interval(1000)
.multicast(new Rx.Subject())
.refCount();
var observer = {
next: function (value) {...},
error: function (err) {...},
complete: function () {...},
};
var started = observable.subscribe(observer);
started.unsubscribe();
Use o metodo publish de um observable para automaticamente criar multicast.
var observable = Rx.Observable
.interval(1000)
.multicast(new Rx.Subject());
é igual a:
var observable = Rx.Observable
.interval(1000)
.publish();
Para cada tipo de subject existe um metodo publish apropriado:
- .publish() -> .multicast(new Rx.Subject())
- .publishReplay(bufferSize) -> .multicast(new Rx.ReplaySubject(bufferSize))
- .publishBehavior('initialValue') -> .multicast(new Rx.BehaviorSubject('initialValue'))
- .publishLast() -> .multicast(new Rx.AsyncSubject())
Quando um subject é completo (complete), este subject não emite mais valores, entretanto ha casos que quer que seja emitido novamente valores quando um multicast que ja terminou receba uma nova subscrição.
Nestes casos usamos uma 'SubjetFactory' function:
var subjectFactory = function() {
return new Rx.Subject();
}
var shared = Rx.Observable
.interval(1000)
.take(5)
.multicast(subjectFactory)
.refCount();
var observerAsFromZero = {
next: function(a){console.log('A -> ' + a);},
error: function(e){console.log('Error A -> ' + e);},
complete: function(){console.log('Done A');}
};
console.log('subscribe shared with A');
shared.subscribe(observerAsFromZero);
var observerAsFromTwo = {
next: function(a){console.log('B -> ' + a);},
error: function(e){console.log('Error B -> ' + e);},
complete: function(){console.log('Done B');}
};
setTimeout(function () {
console.log('subscribe shared with B');
shared.subscribe(observerAsFromTwo);
}, 2000);
setTimeout(function () {
console.log('subscribe shared with A again');
shared.subscribe(observerAsFromZero);
}, 6000);