Skip to content

Instantly share code, notes, and snippets.

@cwharris
Last active December 23, 2015 06:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cwharris/6598110 to your computer and use it in GitHub Desktop.
Save cwharris/6598110 to your computer and use it in GitHub Desktop.
Free drink on checkin. `pendingPours` will be incremented each time a user checks into Four Square. :)
var Rx = require('rx'),
Rx = require('./rx.helpers')
;
var FlowLogic = (function () {
Rx.Internals.inherits(FlowLogic, Rx.Observable);
// pendingPours is a behavior subject.
// solenoid is an observer. It represents the valve allowing drink flow.
// flowMeter is an observable. It represents how much drink is currently flowing.
// options is just a plain old object.
function FlowLogic (pendingPours, solenoid, flowMeter, options) {
var flowStatus =
flowMeter
.select(function (x) { return x > 127 ? 'open' : 'closed'})
.distinctUntilChanged();
var flowBegins =
flowStatus
.where(function (x) { return x === 'open'; });
var flowEnds =
flowStatus
.throttle(1000)
.where(function (x) { return x === 'closed'; });
function greaterThanZero (x) { return x > 0; };
function decrement (pours) { return pours - 1; };
this.subject =
pendingPours
.where(greaterThanZero)
.take(1)
.log('preparing pour. {{x}} pending pour(s).')
.map(decrement)
.onNextToObserver(pendingPours)
.log(' - opening solenoid')
.onNextToObserver(solenoid, 255)
.waitUntil(flowBegins)
.log(' - user started pouring')
.waitUntil(
Rx.Observable.amb(
flowEnds
.log(' - user stopped pouring'),
Rx.Observable.timer(options.pourTimeout, options.scheduler)
.log(' - pour timed out')
))
.log(' - closing solenoid')
.onNextToObserver(solenoid, 0) // sideEffect
.delay(options.delayBetweenUsers, options.scheduler)
.log('pour complete. {{x}} pending pour(s) remain.')
.observeOn(Rx.Scheduler.timeout)
.repeat() // magics
.publish().refCount();
};
FlowLogic.prototype._subscribe = function (o) {
return this.subject.subscribe(o);
};
return FlowLogic;
})();
module.exports = FlowLogic;
var Rx = require('rx')
;
// - Logging -----
Rx.Observable.prototype.log = function () {
var args = arguments;
return this.doAction(function (x) {
console.log.apply(console, Array.prototype.map.call(args, function (value) {
return typeof value === 'string'
? value.replace("{{x}}", x)
: value;
}));
});
};
// - Side Effects -----
Rx.Observable.prototype.onNextToObserver = function (observer, value) {
if (arguments.length == 2) {
return this.doAction(function (x) {
observer.onNext(value);
});
}
return this.doAction(function (x) {
observer.onNext(x);
});
};
Rx.Observable.prototype.waitUntil = function (obs) {
return this.selectMany(function (x) {
return obs.take(1).select(function () { return x; });
});
};
// - Time -----
function timestampDifference (timestamps) {
return timestamps[1].timestamp - timestamps[0].timestamp;
}
Rx.Observable.prototype.frequency = function () {
return this
.timestamp()
.bufferWithCount(2, 1)
.select(timestampDifference);
};
module.exports = Rx;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment