Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benjchristensen/970a9c02ac9423bfbb0c to your computer and use it in GitHub Desktop.
Save benjchristensen/970a9c02ac9423bfbb0c to your computer and use it in GitHub Desktop.
Multicasting a cold, infinite Observable and using onBackpressureBuffer/Drop to handle overflow
import java.util.concurrent.CountDownLatch;
import rx.Observable;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
/**
* This shows how a "reactive pull" compliant "cold" Observable, when multicasted, becomes "hot" and each Subscriber
* must then choose its strategy for overflow.
*/
public class MulticastColdInfiniteBackpressureExample {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
// multicast a "cold" source
ConnectableObservable<Integer> source = getData(1).publish();
/**
* This buffers so will get the first 2000 of 5000 emitted
*/
source.onBackpressureBuffer().observeOn(Schedulers.computation())
.map(i -> "one => " + i).take(2000).finallyDo(() -> latch.countDown()).forEach(System.out::println);
/**
* This drops, so will receive with first 1024 (size of internal buffer) and then pick up in the stream again
* when it can consume more and get large values like 159023.
*/
source.onBackpressureDrop().observeOn(Schedulers.computation())
.map(i -> "two => " + i).take(2000).finallyDo(() -> latch.countDown()).forEach(System.out::println);
source.connect();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Not actually infinite, but large enough to behave such for this example.
*/
public static Observable<Integer> getData(int id) {
return Observable.range(id, Integer.MAX_VALUE).subscribeOn(Schedulers.computation());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment