Skip to content

Instantly share code, notes, and snippets.

@devsr
Last active October 1, 2016 14:14
Show Gist options
  • Save devsr/5c6f3e5478a1773391d3eb12daa5789f to your computer and use it in GitHub Desktop.
Save devsr/5c6f3e5478a1773391d3eb12daa5789f to your computer and use it in GitHub Desktop.
Demonstration of RxJava issue #4630
package sandbox;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.internal.operators.OperatorFairScheduling;
import rx.internal.util.RxRingBuffer;
import rx.schedulers.Schedulers;
public class RxJavaFairTest {
public static void main(String[] args) throws Exception {
/* Executor and scheduler. */
ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor);
/* The test observable. */
Observable<Integer> test;
/*
* Test change code below to test various operators. Show currently is
* observeOn and delay.
*
* The range operator is a synchronous producer, however this behavior
* is repeatable with any 'fast-enough' producer.
*/
test = Observable.range(0, RxRingBuffer.SIZE * 2).observeOn(scheduler).rebatchRequests(20);
// test = Observable.range(0, RxRingBuffer.SIZE * 2)
// .observeOn(scheduler).lift(new OperatorFairScheduling<>(scheduler,
// 20));
// test = Observable.range(0, RxRingBuffer.SIZE * 2)
// .observeOn(scheduler);
// test = Observable.range(0, RxRingBuffer.SIZE * 2).delay(0,
// TimeUnit.MILLISECONDS, scheduler);
/* Using a phaser to ensure repeatable results. */
Phaser sem = new Phaser(2);
/*
* Do subscribe from an arbitrary thread. Since the test observable is a
* range operator this thread will also be the initial producer.
*
* With ObserveOn this thread handles the first RxRingBuffer.SIZE
* emissions if the consumer is slow, otherwise it can handle all of
* them. With Delay this thread handles all the emissions.
*/
new Thread(() -> doSubscribe(test, sem)).start();
sem.arriveAndAwaitAdvance(); // Wait for phase 1.
/*
* Schedule something else. Since the observable is already emitting
* this task will not get run until the observable terminates.
*/
executor.execute(() -> System.out.println("Executing something else."));
System.out.println("Submitted something else.");
sem.arriveAndAwaitAdvance(); // Wait phase 2.
sem.awaitAdvance(sem.arriveAndDeregister()); // Wait phase 3 and done.
/* Clean-up. */
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
private static void doSubscribe(Observable<Integer> observable, Phaser sem) {
observable.subscribe(new MyObserver(sem));
System.out.println("doSubscribe done.");
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class MyObserver implements Observer<Object> {
private final Phaser sem;
private boolean first = true;
public MyObserver(Phaser sem) {
this.sem = sem;
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Object t) {
if (first) {
first = false;
sem.arriveAndAwaitAdvance(); // Wait for phase 1;
sem.arriveAndAwaitAdvance(); // Wait for phase 2;
}
/*
* Try uncommenting sleep(int) below to slow down the consumer.
* Notice the change in behavior of the subscription thread when
* using the ObserveOn operator.
*
* The subscription thread either emits all items synchronously for
* a fast consumer, or the first RxRingBuffer.SIZE for a slow
* consumer and produces in RxRingBuffer.SIZE chunks in response to
* ObserveOn reactive-pull.
*
* Either way the outcome is same the executor is never yielded.
*/
// sleep(10);
System.out.println("Consuming: " + t);
};
@Override
public void onCompleted() {
sem.arriveAndDeregister(); // Arrive (no-wait) phase 3 and done.
System.out.println("Subscription done.");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment