Skip to content

Instantly share code, notes, and snippets.

@mmv
Created July 15, 2016 22:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmv/8c3ef5eb9d157c1a2923f3060dfac954 to your computer and use it in GitHub Desktop.
Save mmv/8c3ef5eb9d157c1a2923f3060dfac954 to your computer and use it in GitHub Desktop.
const INITIAL_CREDIT = 3; // <-- try changing to a value like 30
// <-- to see the difference
const ITEMS_TO_PROCESS = new Array(10);
var count = 0; // number of items being processed
var flowControl = new Rx.Subject();
Rx.Observable.create(function(obs) {
Rx.Observable.fromArray(ITEMS_TO_PROCESS)
.zip(flowControl) // zipping with the flow control tokens
// makes this stream advance only as much
// as those tokens flow
.subscribe(
x_ => obs.onNext(x_[0]), // then we discard the tokens
// and push just our items down
// the stream
_ => {},
() => obs.onCompleted());
})
// and then the "heavy processing" stage
.do(_ => { count++; msg("items being processed: " + count); })
.selectMany(x => Rx.Observable.fromPromise(processItem(x)))
.do(_ => { count--; msg("items being processed: " + count); })
// after the "heavy processing" we just throw more tokens
// into the flow control stream, so we keep the processing going
.do(_ => flowControl.onNext(null) )
.subscribe();
// setup of initial tokens
// we can start with a few of them but it's important we don't
// signal stream completion into the token stream, otherwise we
// abort the whole processing.
// if we'll have at most a single token
// this could be replaced by just `flowControl.onNext(null)`
Rx.Observable.range(0,INITIAL_CREDIT)
.concat(Rx.Observable.never())
.subscribe(flowControl);
function msg(m) {
console.log((new Date()).toISOString() + " " + m);
}
// simulate long-running processing where items end up waiting
// for one another
function processItem(x) {
return new Promise((resolve,reject) => window.setTimeout(resolve, (count+1)* 500));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment