Skip to content

Instantly share code, notes, and snippets.

@cwharris
Last active August 29, 2015 13:57
Show Gist options
  • Save cwharris/9543564 to your computer and use it in GitHub Desktop.
Save cwharris/9543564 to your computer and use it in GitHub Desktop.
An idea regarding back-pressure for iterable sources.
var Rx = require('rx'),
Ix = require('ix'),
log = console.log.bind(console);
Rx.Observable.iterate(
Ix.Enumerable.range(0, 100),
10,
function (x) {
return Rx.Observable.empty().delay(Math.random() * 1000);
})
.subscribe(log);
Rx.Observable.iterate = function (enumerable, maxConcurrent, workSelector) {
return Rx.Observable.create(function (o) {
var i = 0,
items = [],
enumerator = enumerable.getEnumerator(),
done = new Rx.Subject(),
more = function () {
if (enumerator.moveNext()) {
return true;
} else {
done.onNext(true);
done.onCompleted();
return false;
}
},
next = Rx.Observable
.if(
function () { return more(); },
Rx.Observable
.defer(function () {
return Rx.Observable.return(enumerator.getCurrent());
})
);
maxConcurrent || (maxConcurrent = 1);
while (i++ < maxConcurrent && more()) {
items.push(enumerator.getCurrent());
}
return Rx.Observable
.fromArray(items)
.expand(function (item) {
return workSelector(item)
.ignoreElements()
.concat(next);
})
.takeUntil(done)
.subscribe(o);
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment