Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created July 18, 2012 05:16
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mattpodwysocki/3134356 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/3134356 to your computer and use it in GitHub Desktop.
var subscription = new Rx.CompositeDisposable();
var observable = Rx.Observable.fromSocket('ws://localhost:8080', 'stock-protocol');
subscription.add(
observable.selectMany(function (value) {
var data = JSON.parse(value.data);
return Rx.Observable.fromArray(data.data);
}).groupBy(function (quote) {
return quote.symbol;
}).selectMany(function (stockStream) {
return stockStream.bufferWithCount(5, 1);
}, function (stockStream, w) {
return { stockStream: stockStream, window: w };
}).where(function (t) {
return t.window.length > 0;
}).select(function (t) {
var len = t.window.length, averageHigh = 0, averageLow = 0;
for (var i = 0; i < len; i++) {
averageHigh += parseInt(t.window[i].high);
averageLow += parseInt(t.window[i].low);
}
averageHigh = averageHigh / len;
averageLow = averageLow / len;
return {
symbol: t.stockStream.key,
averageHigh: averageHigh,
averageLow: averageLow
};
}).subscribe(function (x) {
console.log('Stock: ' + x.symbol + ' Average High: ' + x.averageHigh + ' Average Low: ' + x.averageLow);
}, function (err) {
console.log(err);
})
);
Rx.Observable.fromSocket = function (url, protocol) {
return Rx.Observable.create(function (observer) {
var socket = new WebSocket(url, protocol);
socket.onmessage = function (data) {
observer.onNext(data);
};
socket.onerror = function (err) {
observer.onError(err);
};
socket.onclose = function () {
observer.onCompleted();
};
return function () {
socket.close();
};
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment