Created
July 15, 2016 22:43
-
-
Save mmv/8c3ef5eb9d157c1a2923f3060dfac954 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const INITIAL_CREDIT = 3; // <-- try changing to a value like 30 | |
// <-- to see the difference | |
const ITEMS_TO_PROCESS = new Array(10); | |
var count = 0; // number of items being processed | |
var flowControl = new Rx.Subject(); | |
Rx.Observable.create(function(obs) { | |
Rx.Observable.fromArray(ITEMS_TO_PROCESS) | |
.zip(flowControl) // zipping with the flow control tokens | |
// makes this stream advance only as much | |
// as those tokens flow | |
.subscribe( | |
x_ => obs.onNext(x_[0]), // then we discard the tokens | |
// and push just our items down | |
// the stream | |
_ => {}, | |
() => obs.onCompleted()); | |
}) | |
// and then the "heavy processing" stage | |
.do(_ => { count++; msg("items being processed: " + count); }) | |
.selectMany(x => Rx.Observable.fromPromise(processItem(x))) | |
.do(_ => { count--; msg("items being processed: " + count); }) | |
// after the "heavy processing" we just throw more tokens | |
// into the flow control stream, so we keep the processing going | |
.do(_ => flowControl.onNext(null) ) | |
.subscribe(); | |
// setup of initial tokens | |
// we can start with a few of them but it's important we don't | |
// signal stream completion into the token stream, otherwise we | |
// abort the whole processing. | |
// if we'll have at most a single token | |
// this could be replaced by just `flowControl.onNext(null)` | |
Rx.Observable.range(0,INITIAL_CREDIT) | |
.concat(Rx.Observable.never()) | |
.subscribe(flowControl); | |
function msg(m) { | |
console.log((new Date()).toISOString() + " " + m); | |
} | |
// simulate long-running processing where items end up waiting | |
// for one another | |
function processItem(x) { | |
return new Promise((resolve,reject) => window.setTimeout(resolve, (count+1)* 500)); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment