Last active
December 23, 2015 06:59
-
-
Save cwharris/6598110 to your computer and use it in GitHub Desktop.
Free drink on checkin. `pendingPours` will be incremented each time a user checks into Four Square. :)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Rx = require('rx'), | |
Rx = require('./rx.helpers') | |
; | |
var FlowLogic = (function () { | |
Rx.Internals.inherits(FlowLogic, Rx.Observable); | |
// pendingPours is a behavior subject. | |
// solenoid is an observer. It represents the valve allowing drink flow. | |
// flowMeter is an observable. It represents how much drink is currently flowing. | |
// options is just a plain old object. | |
function FlowLogic (pendingPours, solenoid, flowMeter, options) { | |
var flowStatus = | |
flowMeter | |
.select(function (x) { return x > 127 ? 'open' : 'closed'}) | |
.distinctUntilChanged(); | |
var flowBegins = | |
flowStatus | |
.where(function (x) { return x === 'open'; }); | |
var flowEnds = | |
flowStatus | |
.throttle(1000) | |
.where(function (x) { return x === 'closed'; }); | |
function greaterThanZero (x) { return x > 0; }; | |
function decrement (pours) { return pours - 1; }; | |
this.subject = | |
pendingPours | |
.where(greaterThanZero) | |
.take(1) | |
.log('preparing pour. {{x}} pending pour(s).') | |
.map(decrement) | |
.onNextToObserver(pendingPours) | |
.log(' - opening solenoid') | |
.onNextToObserver(solenoid, 255) | |
.waitUntil(flowBegins) | |
.log(' - user started pouring') | |
.waitUntil( | |
Rx.Observable.amb( | |
flowEnds | |
.log(' - user stopped pouring'), | |
Rx.Observable.timer(options.pourTimeout, options.scheduler) | |
.log(' - pour timed out') | |
)) | |
.log(' - closing solenoid') | |
.onNextToObserver(solenoid, 0) // sideEffect | |
.delay(options.delayBetweenUsers, options.scheduler) | |
.log('pour complete. {{x}} pending pour(s) remain.') | |
.observeOn(Rx.Scheduler.timeout) | |
.repeat() // magics | |
.publish().refCount(); | |
}; | |
FlowLogic.prototype._subscribe = function (o) { | |
return this.subject.subscribe(o); | |
}; | |
return FlowLogic; | |
})(); | |
module.exports = FlowLogic; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Rx = require('rx') | |
; | |
// - Logging ----- | |
Rx.Observable.prototype.log = function () { | |
var args = arguments; | |
return this.doAction(function (x) { | |
console.log.apply(console, Array.prototype.map.call(args, function (value) { | |
return typeof value === 'string' | |
? value.replace("{{x}}", x) | |
: value; | |
})); | |
}); | |
}; | |
// - Side Effects ----- | |
Rx.Observable.prototype.onNextToObserver = function (observer, value) { | |
if (arguments.length == 2) { | |
return this.doAction(function (x) { | |
observer.onNext(value); | |
}); | |
} | |
return this.doAction(function (x) { | |
observer.onNext(x); | |
}); | |
}; | |
Rx.Observable.prototype.waitUntil = function (obs) { | |
return this.selectMany(function (x) { | |
return obs.take(1).select(function () { return x; }); | |
}); | |
}; | |
// - Time ----- | |
function timestampDifference (timestamps) { | |
return timestamps[1].timestamp - timestamps[0].timestamp; | |
} | |
Rx.Observable.prototype.frequency = function () { | |
return this | |
.timestamp() | |
.bufferWithCount(2, 1) | |
.select(timestampDifference); | |
}; | |
module.exports = Rx; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment