Created
February 14, 2014 21:54
-
-
Save mattpodwysocki/9010149 to your computer and use it in GitHub Desktop.
Backpressure mechanisms in RxJS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var source = Rx.Observable.interval(100).bufferWithTimeOrCount(500 /*ms*/, 50 /* items */); | |
source.subscribe(function (arr) { | |
// Have chunked array based upon time or count, whichever hits first | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create pauser | |
var pauser = new Rx.Subject(); | |
// Create paused stream state | |
var source = Rx.Observable.interval(100).pausable(pauser); | |
// Subscribe | |
controlledSource.subscribe(function (next) { | |
// Do whatever with next, or even pause | |
subject.onNext(false); | |
}); | |
// Start the stream | |
pauser.onNext(true); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Attach controller to the observable | |
var source = Rx.Observable.interval(100).attachController(true); | |
// The controller of the Stream | |
var controller = new Rx.Subject(); | |
// Get the controlled source bound by the subject | |
var controlledSource = source.controlledBy(controller); | |
// Subscribe | |
controlledSource.subscribe(function (next) { | |
// Do whatever with next, such as get 3 more | |
subject.onNext(3); | |
}); | |
// Get three values | |
subject.onNext(3); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment