Skip to content

Instantly share code, notes, and snippets.

@trxcllnt
Last active August 29, 2015 14:13
Show Gist options
  • Save trxcllnt/1c277f25d2c0250f4ac3 to your computer and use it in GitHub Desktop.
Save trxcllnt/1c277f25d2c0250f4ac3 to your computer and use it in GitHub Desktop.
Testing Rx's currentThread and immediate scheduler schedule events appropriately even when requested in the future (and they have to block).
/* output:
legend:
current: Scheduled concurrently by the CurrentThread scheduler.
recursive: Scheduled recursively by the Immediate scheduler.
setTimeout: Scheduled asynchronously by the Timeout scheduler.
blocking-current: Scheduled to block concurrently by the CurrentThread scheduler.
blocking-recursive: Scheduled to block recursively by the Immediate scheduler.
setTimeout a 0
setTimeout b 0
setTimeout a 1
setTimeout b 1
setTimeout a 2
setTimeout b 2
current a 0
current b 0
current a 1
current b 1
current a 2
current b 2
recursive a 0
recursive a 1
recursive a 2
recursive b 0
recursive b 1
recursive b 2
blocking-current a for 250ms
blocking-current b for 750ms
blocking-current a 0
blocking-current a 1
blocking-current b 0
blocking-current a 2
blocking-current b 1
blocking-current b 2
blocking-recursive a for 750ms
blocking-recursive a 0
blocking-recursive a 1
blocking-recursive a 2
blocking-recursive b for 250ms
blocking-recursive b 0
blocking-recursive b 1
blocking-recursive b 2
*/
Function.prototype.composeR = function() {
var fns = [this].concat(slice.call(arguments));
return function(x) {
var i = fns.length;
while(--i > -1) {
x = fns[i](x);
}
return x;
};
}
var slice = Array.prototype.slice;
var concat = String.prototype.concat;
var A = concat.bind("a ");
var B = concat.bind("b ");
var cur = concat.bind("current ");
var rec = concat.bind("recursive ");
var sto = concat.bind("setTimeout ");
var bCur = concat.bind("blocking-current ");
var bRec = concat.bind("blocking-recursive ");
var log = console.log.bind(console);
var Rx = require('rx');
var to = 3;
var rCur = Rx.Observable.range(0, to, Rx.Scheduler.currentThread);
var rRec = Rx.Observable.range(0, to, Rx.Scheduler.immediate);
var rSto = Rx.Observable.interval(500).take(to);
var rBlk = function(x, dt, scheduler) {
return Rx.Observable.defer(function() {
console.log(x("for " + dt + "ms"));
return Rx.Observable.create(function(observer) {
return scheduler.scheduleRecursiveWithRelativeAndState(0, dt, function(state, self) {
observer.onNext(state);
if(++state < to) {
self(state, dt);
} else {
observer.onCompleted();
}
});
});
});
};
console.log("\nlegend:");
console.log(" current: Scheduled concurrently by the CurrentThread scheduler.");
console.log(" recursive: Scheduled recursively by the Immediate scheduler.");
console.log(" setTimeout: Scheduled asynchronously by the Timeout scheduler.");
console.log(" blocking-current: Scheduled to block concurrently by the CurrentThread scheduler.");
console.log(" blocking-recursive: Scheduled to block recursively by the Immediate scheduler.");
console.log("");
Rx.Observable.concat([
Rx.Observable.merge([
rSto.map(sto.composeR(A)),
rSto.map(sto.composeR(B)),
]),
Rx.Observable.merge([
rCur.map(cur.composeR(A)),
rCur.map(cur.composeR(B))
]),
Rx.Observable.merge([
rRec.map(rec.composeR(A)),
rRec.map(rec.composeR(B))
]),
Rx.Observable.merge([
rBlk(bCur.composeR(A), 250, Rx.Scheduler.currentThread).map(bCur.composeR(A)),
rBlk(bCur.composeR(B), 750, Rx.Scheduler.currentThread).map(bCur.composeR(B))
]),
Rx.Observable.merge([
rBlk(bRec.composeR(A), 750, Rx.Scheduler.immediate).map(bRec.composeR(A)),
rBlk(bRec.composeR(B), 250, Rx.Scheduler.immediate).map(bRec.composeR(B))
]),
]).subscribe(log, log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment