Created
April 7, 2016 10:19
-
-
Save flatmap13/97cc281a086f5a12c96ad36fe95d0393 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main; | |
import rx.Observable; | |
import rx.schedulers.Schedulers; | |
import java.io.IOException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.stream.IntStream; | |
public class BackpressureDemo { | |
public static void main(String[] args) throws IOException { | |
// First we create a fast producing Observable | |
final Observable<Integer> fastProducer = Observable.create(observer -> | |
IntStream.range(0, 133701).forEach(observer::onNext) | |
); | |
// Uncomment this code block to witness the `MissingBackPressureException` | |
/* | |
fastProducer | |
.observeOn(Schedulers.newThread()) // we use observeOn to decouple producer from consumer (without this decoupling back-pressure is not needed) | |
.subscribe(BackpressureDemo::slowConsumer); // we subscribe to the fast producer with a (relatively) slow consumer | |
*/ | |
// Uncomment to see buffering back-pressure strategy in action | |
// Keep in mind that this introduces an unbounded buffer | |
/* | |
fastProducer | |
.onBackpressureBuffer() | |
.observeOn(Schedulers.newThread()) | |
.subscribe(BackpressureDemo::slowConsumer); | |
*/ | |
// Uncomment to see dropping back-pressure strategy in action | |
// Notice in the output that events will be dropped when back-pressure is applied | |
// Also notice that the output will reveal the internal default buffer size | |
/* | |
fastProducer | |
.onBackpressureDrop() | |
.observeOn(Schedulers.newThread()) | |
.subscribe(BackpressureDemo::slowConsumer); | |
*/ | |
/** Alternative ways of dealing with fast producers (without using back-pressure) by calming down the stream that goes into the slow consumer **/ | |
// throttle | |
/* | |
fastProducer | |
.throttleFirst(1, TimeUnit.MILLISECONDS) | |
.observeOn(Schedulers.newThread()) | |
.subscribe(BackpressureDemo::slowConsumer); | |
*/ | |
// debounce | |
/* | |
fastProducer | |
.debounce(100, TimeUnit.NANOSECONDS) | |
.observeOn(Schedulers.newThread()) | |
.subscribe(BackpressureDemo::slowConsumer); | |
*/ | |
// buffer | |
/* | |
fastProducer | |
.buffer(1000) | |
.observeOn(Schedulers.newThread()) | |
.subscribe(window -> System.out.println("Window: " + window)); | |
*/ | |
/** Another "option" is to make sure that the producer and consumer are not decoupled, this is also referred to as "blocking on the call-stack". **/ | |
/* | |
fastProducer.subscribe(BackpressureDemo::slowConsumer); | |
*/ | |
System.in.read(); | |
} | |
public static void slowConsumer(int i) { | |
// on most systems this is (relatively) slow enough, add more latency if back-pressure doesn't hit (e.g. Thread.sleep()) | |
System.out.println("> " + i); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment