Skip to content

Instantly share code, notes, and snippets.

@wishfoundry
Last active June 3, 2016 19:44
Show Gist options
  • Save wishfoundry/ffe4f61c7acd276371d7 to your computer and use it in GitHub Desktop.
Save wishfoundry/ffe4f61c7acd276371d7 to your computer and use it in GitHub Desktop.
An implementation of Rx.ReplaySubject for cujo/most
var map = require("./collection").map;
var MAX_SAFE = Math.pow(2, 53) - 1;
/**
* buffer/cache
*
* @param {number} max
* @param {number} maxInterval
* @param {function} nowGetter
* @constructor
*/
function Buffer(max, maxInterval, nowGetter) {
this.constructor(max, maxInterval, nowGetter);
}
Buffer.prototype.constructor = function(max, maxInterval, nowGetter) {
max = Math.min((Number(max) || 1), MAX_SAFE);
maxInterval = Math.min((Number(maxInterval) || MAX_SAFE), MAX_SAFE);
var getNow = nowGetter || function() {
return +new Date();
};
var buffer = [];
// ensure buffer is within size and time constraints
function flush(now) {
if (!buffer.length)
return;
var start = 0;
while (!isInWindow(buffer[start].interval, now, maxInterval)) {
start++;
}
start = Math.max(start, (buffer.length - max));
buffer = buffer.slice(start, buffer.length);
}
function isInWindow(interval, now, window) {
return (now - interval) < window;
}
function enQueue(value) {
var now = getNow();
if (buffer.length < MAX_SAFE)
buffer.push({
value : value,
interval: now
});
flush(now);
}
function each(fn) {
flush(getNow());
each(buffer, function(item) {
fn(item.value)
});
}
function values() {
flush(getNow());
return map(buffer, function(item) {
return item.value;
});
}
this.add = enQueue;
this.forEach = each;
this.clearAll = function() {
buffer = [];
};
this.getAll = function() {
return values();
}
};
module.exports = Buffer;
var most = require("../vendor/most");
var Buffer = require("./Buffer");
/**
* A ReplayStream caches events from the src stream and replays them onto a new forked stream when
* new subscribers are added.
*
* ReplayStreams(aka RepalySubjects) follow the "Subject pattern, in that Subjects are both observables and observers.
* https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md
*
* @param {number} maxBuffer
* @param {number} maxInterval
* @constructor
*/
function ReplayStream(maxBuffer, maxInterval) {
this.constructor(maxBuffer, maxInterval);
}
ReplayStream.prototype.constructor = function(maxItems, maxInterval) {
var _add, _end, _error;
var buffer = new Buffer(maxItems, maxInterval);
var stream = most.create(function(add, end, error) {
_add = function(value) {
buffer.add(value);
add(value);
};
_end = end;
_error = error;
return function onEnd() {
_add = _error = _end = function noop() {}; // if events come after the stream has ended, safely noop them
buffer.clearAll(); // since we still reference buffer, JS won't GC it. best to just empty the queue
};
});
// Subjects are by definition "hot", and must be initialized
// here we create a demand to init stream
stream.drain();
// different libararies (e.g. rxjs, kefirjs, baconjs, etc) can have different names
// for the "subscribe" event.
function subscribe(onNext) {
return fork().observe(onNext);
}
this.forEach = this.observe = subscribe;
// since observing forces the stream to init, we expose the "cold" stream here to allow further config
function fork() {
return most.from(buffer.getAll()).concat(stream)
}
this.stream = this.fork = fork;
this.add = function(value, unsafeFast) {
// many observables can have issues resolving if in the same thread/loop, but this can be a performance benny...
if (unsafeFast) {
_add(value);
} else {
setTimeout(function() {
_add(value);
}, 0);
}
return this;
};
this.error = function(value) {
_error(value);
return this;
};
this.end = function() {
_end();
return this;
};
};
module.exports = function create(maxBuffer, maxInterval) {
return new ReplayStream(maxBuffer, maxInterval);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment