Last active
October 1, 2016 14:14
-
-
Save devsr/5c6f3e5478a1773391d3eb12daa5789f to your computer and use it in GitHub Desktop.
Demonstration of RxJava issue #4630
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sandbox; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Phaser; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Scheduler; | |
import rx.internal.operators.OperatorFairScheduling; | |
import rx.internal.util.RxRingBuffer; | |
import rx.schedulers.Schedulers; | |
public class RxJavaFairTest { | |
public static void main(String[] args) throws Exception { | |
/* Executor and scheduler. */ | |
ExecutorService executor = Executors.newSingleThreadExecutor(); | |
Scheduler scheduler = Schedulers.from(executor); | |
/* The test observable. */ | |
Observable<Integer> test; | |
/* | |
* Test change code below to test various operators. Show currently is | |
* observeOn and delay. | |
* | |
* The range operator is a synchronous producer, however this behavior | |
* is repeatable with any 'fast-enough' producer. | |
*/ | |
test = Observable.range(0, RxRingBuffer.SIZE * 2).observeOn(scheduler).rebatchRequests(20); | |
// test = Observable.range(0, RxRingBuffer.SIZE * 2) | |
// .observeOn(scheduler).lift(new OperatorFairScheduling<>(scheduler, | |
// 20)); | |
// test = Observable.range(0, RxRingBuffer.SIZE * 2) | |
// .observeOn(scheduler); | |
// test = Observable.range(0, RxRingBuffer.SIZE * 2).delay(0, | |
// TimeUnit.MILLISECONDS, scheduler); | |
/* Using a phaser to ensure repeatable results. */ | |
Phaser sem = new Phaser(2); | |
/* | |
* Do subscribe from an arbitrary thread. Since the test observable is a | |
* range operator this thread will also be the initial producer. | |
* | |
* With ObserveOn this thread handles the first RxRingBuffer.SIZE | |
* emissions if the consumer is slow, otherwise it can handle all of | |
* them. With Delay this thread handles all the emissions. | |
*/ | |
new Thread(() -> doSubscribe(test, sem)).start(); | |
sem.arriveAndAwaitAdvance(); // Wait for phase 1. | |
/* | |
* Schedule something else. Since the observable is already emitting | |
* this task will not get run until the observable terminates. | |
*/ | |
executor.execute(() -> System.out.println("Executing something else.")); | |
System.out.println("Submitted something else."); | |
sem.arriveAndAwaitAdvance(); // Wait phase 2. | |
sem.awaitAdvance(sem.arriveAndDeregister()); // Wait phase 3 and done. | |
/* Clean-up. */ | |
executor.shutdown(); | |
executor.awaitTermination(30, TimeUnit.SECONDS); | |
} | |
private static void doSubscribe(Observable<Integer> observable, Phaser sem) { | |
observable.subscribe(new MyObserver(sem)); | |
System.out.println("doSubscribe done."); | |
} | |
private static void sleep(long millis) { | |
try { | |
Thread.sleep(millis); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
private static class MyObserver implements Observer<Object> { | |
private final Phaser sem; | |
private boolean first = true; | |
public MyObserver(Phaser sem) { | |
this.sem = sem; | |
} | |
@Override | |
public void onError(Throwable e) { | |
e.printStackTrace(); | |
} | |
@Override | |
public void onNext(Object t) { | |
if (first) { | |
first = false; | |
sem.arriveAndAwaitAdvance(); // Wait for phase 1; | |
sem.arriveAndAwaitAdvance(); // Wait for phase 2; | |
} | |
/* | |
* Try uncommenting sleep(int) below to slow down the consumer. | |
* Notice the change in behavior of the subscription thread when | |
* using the ObserveOn operator. | |
* | |
* The subscription thread either emits all items synchronously for | |
* a fast consumer, or the first RxRingBuffer.SIZE for a slow | |
* consumer and produces in RxRingBuffer.SIZE chunks in response to | |
* ObserveOn reactive-pull. | |
* | |
* Either way the outcome is same the executor is never yielded. | |
*/ | |
// sleep(10); | |
System.out.println("Consuming: " + t); | |
}; | |
@Override | |
public void onCompleted() { | |
sem.arriveAndDeregister(); // Arrive (no-wait) phase 3 and done. | |
System.out.println("Subscription done."); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment