Skip to content

Instantly share code, notes, and snippets.

@storkme
Created January 24, 2015 11:47
Show Gist options
  • Save storkme/25c38441658141b419f3 to your computer and use it in GitHub Desktop.
Save storkme/25c38441658141b419f3 to your computer and use it in GitHub Desktop.
Generate a 'sliding window' of values produced by a time-series source.
var Rx = require('rx');
/**
* Generate a 'sliding window' of the input time series. Please note that this function assumes that the observable
* input series is a linear time series, that is: `selector(n) < selector(n+1)`. Otherwise things will get weird.
*
* @param windowSizeMs - size of the sliding window in ms
* @param {function} [selector] - selector function that obtains the unix timestamp value for a given item
* @param {Rx.Scheduler} [scheduler] - optional scheduler
* @returns {Rx.Observable<T>}
*/
Rx.Observable.prototype.slidingWindow = function (windowSizeMs, selector, scheduler) {
Rx.helpers.isScheduler(scheduler) || (scheduler = Rx.Scheduler.timeout);
//if no selector is supplied, use a default one that assigns the current time to the item
typeof selector === 'function' || (selector = function () {
return scheduler.now();
});
var self = this;
return Rx.Observable.create(function (obs) {
var disposables = new Rx.CompositeDisposable(),
buf = [];
disposables.add(self.subscribe(function (next) {
var t = selector(next);
if (t > scheduler.now() - windowSizeMs) {
//if the new value is in our window, add it
buf = buf.concat(next);
//emit the new buffer
obs.onNext(buf);
//schedule a function to remove the oldest value when this value is due to expire.
var removeAt = t + windowSizeMs;
disposables.add(scheduler.scheduleWithAbsolute(removeAt, function () {
buf.splice(0, 1);
obs.onNext(buf);
}));
}
},
obs.onError.bind(obs),
function () {
disposables.dispose();
obs.onCompleted();
}));
return disposables;
});
};
// here's some test stuff.
var source = Rx.Observable.generateWithRelativeTime(
0,
function () {
//#cantstopwontstop
return true;
},
function (x) {
return x + 1;
},
function (x) {
return x;
},
function (x) {
//generate a new value at your leisure some time in the next 5 seconds pls
return Math.round(Math.random() * 5000);
});
source
.slidingWindow(30000)
.subscribe(function (result) {
console.log("there were " + result.length + " items in the last 30s");
},
function (err) {
console.error(err);
},
function () {
console.log("done");
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment