Skip to content

Instantly share code, notes, and snippets.

@chris-horner
Created October 20, 2016 03:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chris-horner/3aa05e1f368efc78796fa1de938f850e to your computer and use it in GitHub Desktop.
Save chris-horner/3aa05e1f368efc78796fa1de938f850e to your computer and use it in GitHub Desktop.
import rx.Emitter;
import rx.Observable;
import rx.schedulers.Schedulers;
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment