Skip to content

Instantly share code, notes, and snippets.

@ysyun
Last active June 26, 2017 03:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save ysyun/6e91e1b02311bb04b59841ef1e09747b to your computer and use it in GitHub Desktop.
Save ysyun/6e91e1b02311bb04b59841ef1e09747b to your computer and use it in GitHub Desktop.
RxJS Subjects and Multicasting Operators
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
// function multipleBy(cb) {
// const source = this;
// return Rx.Observable.create( (observer) => {
// source.subscribe((x) => {
// const result = cb(x);
// observer.next(result);
// }, (err) => {}, () => dom.append('<li>done</li>'))
// } );
// }
// Rx.Observable.prototype.multipleBy = multipleBy;
const transform = observable.map((x) => x * 10);
transform.subscribe((x) => dom.append('<li>' + x + '</li>'));
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);
// reference count 0 -> 1, 1 -> 2, 2 -> 1, 1 -> 0
// use autoConnectableObservable
// use refCount()
const dom = $('#operator');
const autoConnectableObservable = Rx.Observable.interval(1000)
.do((x) => dom.append('<li> source: ' + x + '</li>'))
// .take(5)
.multicast(new Rx.AsyncSubject(1))
.refCount(); // 1) connect(),
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
let subA = autoConnectableObservable.subscribe(observerA);
// refCount 0 -> 1
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
let subB;
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subB = autoConnectableObservable.subscribe(observerB);
// refCount 1 -> 2
}, 2000);
setTimeout(() => {
subA.unsubscribe(); // refCount 2 -> 1
}, 2000);
setTimeout(() => {
subB.unsubscribe(); // refCount 1 -> 0 // end
}, 4000);
// use publish for multicast(Subject)
// use publish().refCount()
// use share()
const dom = $('#operator');
const autoConnectableObservable = Rx.Observable.interval(1000)
.do((x) => dom.append('<li> source: ' + x + '</li>'))
// .take(2)
// .publish() // === multicast(new Rx.Subject())
// .refCount(); // 1) connect(),
.share() // === publish().refCount();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
let subA = autoConnectableObservable.subscribe(observerA);
// refCount 0 -> 1
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
let subB;
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subB = autoConnectableObservable.subscribe(observerB);
// refCount 1 -> 2
}, 2000);
setTimeout(() => {
subA.unsubscribe(); // refCount 2 -> 1
}, 2000);
setTimeout(() => {
subB.unsubscribe(); // refCount 1 -> 0 // end
}, 4000);
const subject = new Rx.Subject();
function subjectFactory() {
return new Rx.Subject();
}
const dom = $('#operator');
const observable = Rx.Observable.interval(1000)
.take(7)
.do(x => dom.append('<li>source ' + x + '</li>'))
.multicast(subjectFactory)
.refCount();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('--subscribe A');
let subA = observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
let subB;
setTimeout(() => {
dom.append('--subscribe B');
subB = observable.subscribe(observerB);
}, 3000);
setTimeout(() => {
subA.unsubscribe();
}, 4000);
setTimeout(() => {
subB.unsubscribe();
}, 4100);
setTimeout(() => {
subA = observable.subscribe(observerA);
}, 6000);
// If you want to manipulation and then share
const dom = $('#operator');
function subjectFactory() {
return Rx.Subject();
}
const result = Rx.Observable.interval(1000).take(8)
.do(x => dom.append('<li>source' + x + '</li>'))
.map(x => Math.random())
.multicast(subjectFactory, function selector(shared) {
const sharedDelayed = shared.delay(500);
const merged = shared.merge(sharedDelayed);
return merged;
});
result.subscribe(x => dom.append('<li>' + x + '</li>'));
// share multicast bridge observers
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const bridgeObservers = {
observers: [],
subscribe: function(observer) { this.observers.push(observer); },
next: function(x) {
this.observers.forEach((observer) => observer.next(x))
},
error: function(err) {
this.observers.forEach((observer) => observer.error(err))
},
complete: function() {
this.observers.forEach((observer) => observer.complete())
}
}
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
bridgeObservers.subscribe(observerA);
// observable.subscribe(observerA);
observable.subscribe(bridgeObservers);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
// observable.subscribe(observerB);
dom.append('<li>subscribe observerB</li>');
bridgeObservers.subscribe(observerB);
}, 3000);
// change bridge observer to subject
// change addObservers to subscribe
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const bridgeObservers = new Rx.Subject();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
bridgeObservers.subscribe(observerA);
// observable.subscribe(observerA);
observable.subscribe(bridgeObservers);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
// observable.subscribe(observerB);
dom.append('<li>subscribe observerB</li>');
bridgeObservers.subscribe(observerB);
}, 3000);
const observerC = {
next: (item) => dom.append('<li>C:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>C:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerC</li>');
bridgeObservers.subscribe(observerC);
}, 6000);
// remove observable
// use just subject (next, subscribe)
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);
// change subject to behavior subject
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(3);
const subject = new Rx.BehaviorSubject(-1);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
subject.subscribe(observerA);
observable.subscribe(subject);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subject.subscribe(observerB);
}, 4000);
// change subject to behavior subject
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(3);
const subject = new Rx.ReplaySubject(2);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
subject.subscribe(observerA);
observable.subscribe(subject);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subject.subscribe(observerB);
}, 4000);
// change subject to behavior subject
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(3);
const subject = new Rx.AsyncSubject();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
subject.subscribe(observerA);
observable.subscribe(subject);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subject.subscribe(observerB);
}, 4000);
// add multicast syntax with new subject
// connectable observable use connect method
const dom = $('#operator');
const connectableObservable = Rx.Observable.interval(1000)
.take(5)
.multicast(new Rx.Subject());
// const subject = new Rx.Subject();
// setTimeout(() => {
connectableObservable.connect();
// }, 2000);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
connectableObservable.subscribe(observerA);
// subject.subscribe(observerA);
// observable.subscribe(subject);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
// subject.subscribe(observerB);
connectableObservable.subscribe(observerB);
}, 3000);
// when unsubscribe per subscription or unsbuscribe about connectableObservable
// use do operator
const dom = $('#operator');
const connectableObservable = Rx.Observable.interval(1000)
.do((x) => dom.append('<li> source: ' + x + '</li>'))
// .take(5)
.multicast(new Rx.Subject());
let sub = connectableObservable.connect();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
let subA = connectableObservable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
let subB;
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subB = connectableObservable.subscribe(observerB);
}, 3000);
setTimeout(() => {
// subA.unsubscribe();
// subB.unsubscribe();
sub.unsubscribe();
}, 4000);
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>GistRun</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.js"></script>
<script src="https://code.jquery.com/jquery-2.2.4.min.js"></script>
<link rel="stylesheet" href="styles.css">
</head>
<body>
<h1>Subjects and Multicasting</h1>
<div id="operator"></div>
<script src="11-publish.js"></script>
</body>
</html>
/* todo: add styles */
ol,
ul, li {
list-style: none;
margin: 0px;
padding: 0px;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment