Skip to content

Instantly share code, notes, and snippets.

@storkme
storkme / ugh.js
Last active August 29, 2015 14:08
Playing about with RxJs
var Rx = require('rx');
function doSomeAsyncThing(code) {
return Rx.Observable.return({
a: "product a",
b: "product b",
c: "product c",
d: "product d"
}[code]).delay(500 + (Math.random() * 5000));
}
@storkme
storkme / gist:43dec2d73bef5d804375
Last active August 29, 2015 14:13
Description of a workaround for reconnects in node-stomp-client
var Stomp = require('stomp-client');
var client = new Stomp(host, port, user, password, '1.1', vhost);
client.connect(onConnect, onError);
function onConnect(sessId) {
//subscribe to queues etc
}
@storkme
storkme / sliding-window.js
Created January 24, 2015 11:47
Generate a 'sliding window' of values produced by a time-series source.
var Rx = require('rx');
/**
* Generate a 'sliding window' of the input time series. Please note that this function assumes that the observable
* input series is a linear time series, that is: `selector(n) < selector(n+1)`. Otherwise things will get weird.
*
* @param windowSizeMs - size of the sliding window in ms
* @param {function} [selector] - selector function that obtains the unix timestamp value for a given item
* @param {Rx.Scheduler} [scheduler] - optional scheduler
* @returns {Rx.Observable<T>}
@storkme
storkme / using.js
Created February 3, 2015 22:29
Sample with Rx.Observable.using and node-mysql connection pooling
var source = Rx.Observable.using(
() => {
var conn,
disposable = Rx.Observable.fromNodeCallback(this.pool.getConnection, this.pool)()
.doOnNext((connection) => {
conn = connection
});
disposable.dispose = () => {
if (conn) {
@storkme
storkme / rx-socket.io.js
Last active August 29, 2015 14:16
tidy wrapper for socket.io bindings?
/**
* Uses `Rx.Observable.using` to provide resource-collected Observable stream that produces values
* whenever the websocket receives any of the named events passed in as arguments.
*
* @param {string[]} events - named events to bind listeners to
* @returns {Rx.Observable<Array>}
*/
bind(...events) {
return Rx.Observable.using(() => {
//keep a reference to listener functions so we can remove them later