Last active
June 26, 2017 03:04
-
-
Save ysyun/6e91e1b02311bb04b59841ef1e09747b to your computer and use it in GitHub Desktop.
RxJS Subjects and Multicasting Operators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(5); | |
// function multipleBy(cb) { | |
// const source = this; | |
// return Rx.Observable.create( (observer) => { | |
// source.subscribe((x) => { | |
// const result = cb(x); | |
// observer.next(result); | |
// }, (err) => {}, () => dom.append('<li>done</li>')) | |
// } ); | |
// } | |
// Rx.Observable.prototype.multipleBy = multipleBy; | |
const transform = observable.map((x) => x * 10); | |
transform.subscribe((x) => dom.append('<li>' + x + '</li>')); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(5); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
observable.subscribe(observerA); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
observable.subscribe(observerB); | |
}, 3000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// reference count 0 -> 1, 1 -> 2, 2 -> 1, 1 -> 0 | |
// use autoConnectableObservable | |
// use refCount() | |
const dom = $('#operator'); | |
const autoConnectableObservable = Rx.Observable.interval(1000) | |
.do((x) => dom.append('<li> source: ' + x + '</li>')) | |
// .take(5) | |
.multicast(new Rx.AsyncSubject(1)) | |
.refCount(); // 1) connect(), | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
let subA = autoConnectableObservable.subscribe(observerA); | |
// refCount 0 -> 1 | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
let subB; | |
setTimeout(() => { | |
dom.append('<li>subscribe observerB</li>'); | |
subB = autoConnectableObservable.subscribe(observerB); | |
// refCount 1 -> 2 | |
}, 2000); | |
setTimeout(() => { | |
subA.unsubscribe(); // refCount 2 -> 1 | |
}, 2000); | |
setTimeout(() => { | |
subB.unsubscribe(); // refCount 1 -> 0 // end | |
}, 4000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// use publish for multicast(Subject) | |
// use publish().refCount() | |
// use share() | |
const dom = $('#operator'); | |
const autoConnectableObservable = Rx.Observable.interval(1000) | |
.do((x) => dom.append('<li> source: ' + x + '</li>')) | |
// .take(2) | |
// .publish() // === multicast(new Rx.Subject()) | |
// .refCount(); // 1) connect(), | |
.share() // === publish().refCount(); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
let subA = autoConnectableObservable.subscribe(observerA); | |
// refCount 0 -> 1 | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
let subB; | |
setTimeout(() => { | |
dom.append('<li>subscribe observerB</li>'); | |
subB = autoConnectableObservable.subscribe(observerB); | |
// refCount 1 -> 2 | |
}, 2000); | |
setTimeout(() => { | |
subA.unsubscribe(); // refCount 2 -> 1 | |
}, 2000); | |
setTimeout(() => { | |
subB.unsubscribe(); // refCount 1 -> 0 // end | |
}, 4000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const subject = new Rx.Subject(); | |
function subjectFactory() { | |
return new Rx.Subject(); | |
} | |
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000) | |
.take(7) | |
.do(x => dom.append('<li>source ' + x + '</li>')) | |
.multicast(subjectFactory) | |
.refCount(); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('--subscribe A'); | |
let subA = observable.subscribe(observerA); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
let subB; | |
setTimeout(() => { | |
dom.append('--subscribe B'); | |
subB = observable.subscribe(observerB); | |
}, 3000); | |
setTimeout(() => { | |
subA.unsubscribe(); | |
}, 4000); | |
setTimeout(() => { | |
subB.unsubscribe(); | |
}, 4100); | |
setTimeout(() => { | |
subA = observable.subscribe(observerA); | |
}, 6000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// If you want to manipulation and then share | |
const dom = $('#operator'); | |
function subjectFactory() { | |
return Rx.Subject(); | |
} | |
const result = Rx.Observable.interval(1000).take(8) | |
.do(x => dom.append('<li>source' + x + '</li>')) | |
.map(x => Math.random()) | |
.multicast(subjectFactory, function selector(shared) { | |
const sharedDelayed = shared.delay(500); | |
const merged = shared.merge(sharedDelayed); | |
return merged; | |
}); | |
result.subscribe(x => dom.append('<li>' + x + '</li>')); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// share multicast bridge observers | |
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(5); | |
const bridgeObservers = { | |
observers: [], | |
subscribe: function(observer) { this.observers.push(observer); }, | |
next: function(x) { | |
this.observers.forEach((observer) => observer.next(x)) | |
}, | |
error: function(err) { | |
this.observers.forEach((observer) => observer.error(err)) | |
}, | |
complete: function() { | |
this.observers.forEach((observer) => observer.complete()) | |
} | |
} | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
bridgeObservers.subscribe(observerA); | |
// observable.subscribe(observerA); | |
observable.subscribe(bridgeObservers); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
// observable.subscribe(observerB); | |
dom.append('<li>subscribe observerB</li>'); | |
bridgeObservers.subscribe(observerB); | |
}, 3000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// change bridge observer to subject | |
// change addObservers to subscribe | |
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(5); | |
const bridgeObservers = new Rx.Subject(); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
bridgeObservers.subscribe(observerA); | |
// observable.subscribe(observerA); | |
observable.subscribe(bridgeObservers); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
// observable.subscribe(observerB); | |
dom.append('<li>subscribe observerB</li>'); | |
bridgeObservers.subscribe(observerB); | |
}, 3000); | |
const observerC = { | |
next: (item) => dom.append('<li>C:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>C:done</li>'), | |
} | |
setTimeout(() => { | |
dom.append('<li>subscribe observerC</li>'); | |
bridgeObservers.subscribe(observerC); | |
}, 6000); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// remove observable | |
// use just subject (next, subscribe) | |
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(5); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
observable.subscribe(observerA); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
observable.subscribe(observerB); | |
}, 3000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// change subject to behavior subject | |
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(3); | |
const subject = new Rx.BehaviorSubject(-1); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
subject.subscribe(observerA); | |
observable.subscribe(subject); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
dom.append('<li>subscribe observerB</li>'); | |
subject.subscribe(observerB); | |
}, 4000); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// change subject to behavior subject | |
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(3); | |
const subject = new Rx.ReplaySubject(2); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
subject.subscribe(observerA); | |
observable.subscribe(subject); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
dom.append('<li>subscribe observerB</li>'); | |
subject.subscribe(observerB); | |
}, 4000); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// change subject to behavior subject | |
const dom = $('#operator'); | |
const observable = Rx.Observable.interval(1000).take(3); | |
const subject = new Rx.AsyncSubject(); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
subject.subscribe(observerA); | |
observable.subscribe(subject); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
dom.append('<li>subscribe observerB</li>'); | |
subject.subscribe(observerB); | |
}, 4000); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// add multicast syntax with new subject | |
// connectable observable use connect method | |
const dom = $('#operator'); | |
const connectableObservable = Rx.Observable.interval(1000) | |
.take(5) | |
.multicast(new Rx.Subject()); | |
// const subject = new Rx.Subject(); | |
// setTimeout(() => { | |
connectableObservable.connect(); | |
// }, 2000); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
connectableObservable.subscribe(observerA); | |
// subject.subscribe(observerA); | |
// observable.subscribe(subject); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
setTimeout(() => { | |
dom.append('<li>subscribe observerB</li>'); | |
// subject.subscribe(observerB); | |
connectableObservable.subscribe(observerB); | |
}, 3000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// when unsubscribe per subscription or unsbuscribe about connectableObservable | |
// use do operator | |
const dom = $('#operator'); | |
const connectableObservable = Rx.Observable.interval(1000) | |
.do((x) => dom.append('<li> source: ' + x + '</li>')) | |
// .take(5) | |
.multicast(new Rx.Subject()); | |
let sub = connectableObservable.connect(); | |
const observerA = { | |
next: (item) => dom.append('<li>A:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>A:done</li>'), | |
} | |
dom.append('<li>subscribe observerA</li>'); | |
let subA = connectableObservable.subscribe(observerA); | |
const observerB = { | |
next: (item) => dom.append('<li>B:' + item + '</li>'), | |
error: (err) => dom.append(err), | |
complete: () => dom.append('<li>B:done</li>'), | |
} | |
let subB; | |
setTimeout(() => { | |
dom.append('<li>subscribe observerB</li>'); | |
subB = connectableObservable.subscribe(observerB); | |
}, 3000); | |
setTimeout(() => { | |
// subA.unsubscribe(); | |
// subB.unsubscribe(); | |
sub.unsubscribe(); | |
}, 4000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!doctype html> | |
<html lang="en"> | |
<head> | |
<meta charset="utf-8"> | |
<title>GistRun</title> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.js"></script> | |
<script src="https://code.jquery.com/jquery-2.2.4.min.js"></script> | |
<link rel="stylesheet" href="styles.css"> | |
</head> | |
<body> | |
<h1>Subjects and Multicasting</h1> | |
<div id="operator"></div> | |
<script src="11-publish.js"></script> | |
</body> | |
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* todo: add styles */ | |
ol, | |
ul, li { | |
list-style: none; | |
margin: 0px; | |
padding: 0px; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment