Skip to content

Instantly share code, notes, and snippets.

@eplawless
Created February 12, 2015 18:38
Show Gist options
  • Save eplawless/58c130d9b9da0eb92bf9 to your computer and use it in GitHub Desktop.
Save eplawless/58c130d9b9da0eb92bf9 to your computer and use it in GitHub Desktop.
The semantic I expected from .publish().refCount()
var Rx = require('rx');
function RefCountObservable(source) {
this._source = source;
}
RefCountObservable.prototype = Object.create(Rx.Observable.prototype);
RefCountObservable.prototype._refCount = 0;
RefCountObservable.prototype._subject = null;
RefCountObservable.prototype._sourceDisposable = null;
RefCountObservable.prototype._decrementRefCount = function() {
if (--this._refCount === 0) {
console.log('disposing source subscription');
var sourceDisposable = this._sourceDisposable;
if (sourceDisposable) {
sourceDisposable.dispose();
delete this._sourceDisposable;
}
delete this._subject;
}
}
RefCountObservable.prototype._incrementRefCount = function() {
if (++this._refCount === 1) {
console.log('creating source subscription');
var sourceDisposable = this._source.subscribe(this._subject);
if (this._refCount > 0) {
this._sourceDisposable = sourceDisposable;
}
}
}
RefCountObservable.prototype.subscribe = function(observer) {
var self = this;
var disposable = {
isDisposed: false,
dispose: function() {
if (this.isDisposed) return;
this.isDisposed = true;
self._decrementRefCount();
subjectSubscription && subjectSubscription.dispose();
subjectSubscription = null;
}
};
if (this._refCount === 0) {
this._subject = new Rx.Subject;
}
var subjectSubscription = this._subject.subscribe({
__proto__: observer,
onError: function(error) {
observer.onError(error);
disposable.dispose();
},
onCompleted: function() {
observer.onCompleted();
disposable.dispose();
}
});
this._incrementRefCount();
return disposable;
}
RefCountObservable.prototype.refCount = function() { return this; }
Rx.Observable.prototype.refCount = function() {
return new RefCountObservable(this);
}
function createObserver(tag) {
return {
onNext: console.warn.bind(console, tag + ' onNext:'),
onError: console.warn.bind(console, tag + ' onError:'),
onCompleted: console.warn.bind(console, tag + ' onCompleted')
};
}
var timer = Rx.Observable.timer(300).map(function() { return 'timed value' });
var rcTimer = timer.refCount();
var value = Rx.Observable.returnValue('just value');
var rcValue = value.refCount();
console.warn('observer A subscribing to rcValue');
rcValue.subscribe(createObserver('A'));
console.warn('observer B subscribing to rcValue');
rcValue.subscribe(createObserver('B'));
console.log('----------------------------------------')
console.warn('observer C subscribing to rcTimer');
rcTimer.subscribe(createObserver('C'));
console.warn('observer D subscribing to rcTimer');
rcTimer.subscribe(createObserver('D'));
@eplawless
Copy link
Author

observer A subscribing to rcValue
creating source subscription
A onNext: just value
A onCompleted
disposing source subscription
observer B subscribing to rcValue
creating source subscription
B onNext: just value
B onCompleted
disposing source subscription
----------------------------------------
observer C subscribing to rcTimer
creating source subscription
observer D subscribing to rcTimer
C onNext: timed value
D onNext: timed value
C onCompleted
D onCompleted
disposing source subscription

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment