Skip to content

Instantly share code, notes, and snippets.

@flatmap13
Created April 7, 2016 10:19
Show Gist options
  • Save flatmap13/97cc281a086f5a12c96ad36fe95d0393 to your computer and use it in GitHub Desktop.
Save flatmap13/97cc281a086f5a12c96ad36fe95d0393 to your computer and use it in GitHub Desktop.
package main;
import rx.Observable;
import rx.schedulers.Schedulers;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class BackpressureDemo {
public static void main(String[] args) throws IOException {
// First we create a fast producing Observable
final Observable<Integer> fastProducer = Observable.create(observer ->
IntStream.range(0, 133701).forEach(observer::onNext)
);
// Uncomment this code block to witness the `MissingBackPressureException`
/*
fastProducer
.observeOn(Schedulers.newThread()) // we use observeOn to decouple producer from consumer (without this decoupling back-pressure is not needed)
.subscribe(BackpressureDemo::slowConsumer); // we subscribe to the fast producer with a (relatively) slow consumer
*/
// Uncomment to see buffering back-pressure strategy in action
// Keep in mind that this introduces an unbounded buffer
/*
fastProducer
.onBackpressureBuffer()
.observeOn(Schedulers.newThread())
.subscribe(BackpressureDemo::slowConsumer);
*/
// Uncomment to see dropping back-pressure strategy in action
// Notice in the output that events will be dropped when back-pressure is applied
// Also notice that the output will reveal the internal default buffer size
/*
fastProducer
.onBackpressureDrop()
.observeOn(Schedulers.newThread())
.subscribe(BackpressureDemo::slowConsumer);
*/
/** Alternative ways of dealing with fast producers (without using back-pressure) by calming down the stream that goes into the slow consumer **/
// throttle
/*
fastProducer
.throttleFirst(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(BackpressureDemo::slowConsumer);
*/
// debounce
/*
fastProducer
.debounce(100, TimeUnit.NANOSECONDS)
.observeOn(Schedulers.newThread())
.subscribe(BackpressureDemo::slowConsumer);
*/
// buffer
/*
fastProducer
.buffer(1000)
.observeOn(Schedulers.newThread())
.subscribe(window -> System.out.println("Window: " + window));
*/
/** Another "option" is to make sure that the producer and consumer are not decoupled, this is also referred to as "blocking on the call-stack". **/
/*
fastProducer.subscribe(BackpressureDemo::slowConsumer);
*/
System.in.read();
}
public static void slowConsumer(int i) {
// on most systems this is (relatively) slow enough, add more latency if back-pressure doesn't hit (e.g. Thread.sleep())
System.out.println("> " + i);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment