Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created February 14, 2014 21:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/9010149 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/9010149 to your computer and use it in GitHub Desktop.
Backpressure mechanisms in RxJS
var source = Rx.Observable.interval(100).bufferWithTimeOrCount(500 /*ms*/, 50 /* items */);
source.subscribe(function (arr) {
// Have chunked array based upon time or count, whichever hits first
})
// Create pauser
var pauser = new Rx.Subject();
// Create paused stream state
var source = Rx.Observable.interval(100).pausable(pauser);
// Subscribe
controlledSource.subscribe(function (next) {
// Do whatever with next, or even pause
subject.onNext(false);
});
// Start the stream
pauser.onNext(true);
// Attach controller to the observable
var source = Rx.Observable.interval(100).attachController(true);
// The controller of the Stream
var controller = new Rx.Subject();
// Get the controlled source bound by the subject
var controlledSource = source.controlledBy(controller);
// Subscribe
controlledSource.subscribe(function (next) {
// Do whatever with next, such as get 3 more
subject.onNext(3);
});
// Get three values
subject.onNext(3);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment