Skip to content

Instantly share code, notes, and snippets.

@farukonder
Created December 17, 2018 22:18
Show Gist options
  • Save farukonder/e709a10c58603e59faee9e505ec9ea81 to your computer and use it in GitHub Desktop.
Save farukonder/e709a10c58603e59faee9e505ec9ea81 to your computer and use it in GitHub Desktop.
package com.github.farukonder.experimenting.frp.rxjava2.simple;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class Schedulers3LevelBlocking {
static ExecutorService observeOn_1 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("observeOn1"));
static ExecutorService subscribeOn_1 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("subscribeOn1"));
static ExecutorService observeOn_3 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("observeOn3"));
static ExecutorService subscribeOn_3 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("subscribeOn3"));
static ExecutorService observeOn_2 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("observeOn2"));
static ExecutorService subscribeOn_2 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("subscribeOn2"));
static String getlog(String operation,String threadName,String values){
return getlogg(operation, threadName, values, "");
}
static String getlogg(String operation,String threadName,String values,String i){
return padRight(operation, 10) + ", " + padRight(threadName, 15) + ", "+ padRight(values, 25) + ", "+ i.replaceAll("\\s+"," ");
}
static Observable<String> flatMapV2(String threadName, Object v1){
System.out.println(getlog("flatMapV2",threadName,"v1: " + v1));
return Observable.create(emitter -> {
int count = 0;
while (count++<2) {
System.out.println(getlog("create2",Thread.currentThread().getName(),"v1: " + v1 + " count: " + count));
emitter.onNext("" + count);
}
emitter.onComplete();
})
.<String>flatMap(v2 -> flatMapV3(Thread.currentThread().getName(),v1,v2))
.observeOn(Schedulers.from(observeOn_2))
.subscribeOn(Schedulers.from(subscribeOn_2))
.doOnNext(i -> System.out.println(getlogg("doOnNext2",Thread.currentThread().getName(),"v1: " + v1," i: " + i)))
.map(i -> getlogg("map2",Thread.currentThread().getName(),"",i));
}
static Observable<String> flatMapV3(String threadName, Object v1, Object v2){
System.out.println(getlog("flatMapV3",threadName,"v1: " + v1 + " v2: " + v2));
return Observable.create(emitter -> {
int count = 0;
while (count++<2) {
System.out.println(getlog("create3",Thread.currentThread().getName(),"v1: " + v1 +" v2: " + v2 + " count: " + count));
emitter.onNext("" + count);
}
emitter.onComplete();
})
.observeOn(Schedulers.from(observeOn_3))
.subscribeOn(Schedulers.from(subscribeOn_3))
.doOnNext(i -> System.out.println(getlogg("doOnNext3",Thread.currentThread().getName(),"v1: " + v1 +" v2: " + v2," i: " + i)))
.map(i -> getlogg("map3",Thread.currentThread().getName(),"",i.toString()));
}
public static void main(String[] args) throws InterruptedException {
Observable.create(s -> {
int count = 0;
while (count++ < 2) {
System.out.println(getlog("create1",Thread.currentThread().getName(),"v1: " + count));
s.onNext(count);
}
s.onComplete();
}).<String>flatMap(v -> flatMapV2(Thread.currentThread().getName(),v))
.observeOn(Schedulers.from(observeOn_1))
.subscribeOn(Schedulers.from(subscribeOn_1))
.doOnNext(i -> System.out.println(getlogg("doOnNext",Thread.currentThread().getName(),""," i: " + i)))
.map(i -> getlogg("map1",Thread.currentThread().getName(),"",i))
.blockingSubscribe
(s -> System.out.println(getlogg("bsubscribe", Thread.currentThread().getName(), "", " s: " + s)));
observeOn_1.shutdown();
subscribeOn_1.shutdown();
observeOn_2.shutdown();
subscribeOn_2.shutdown();
observeOn_3.shutdown();
subscribeOn_3.shutdown();
}
public static class WorkerThreadFactory implements ThreadFactory {
private int counter = 0;
private String prefix = "";
public WorkerThreadFactory(String prefix) {
this.prefix = prefix;
}
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "_" + counter++);
}
}
public static String padRight(String s, int n) {
return String.format("%1$-" + n + "s", s);
}
public static String padLeft(String s, int n) {
return String.format("%1$" + n + "s", s);
}
}
create1 , subscribeOn1_0 , v1: 1 ,
flatMapV2 , subscribeOn1_0 , v1: 1 ,
create1 , subscribeOn1_0 , v1: 2 ,
create2 , subscribeOn2_0 , v1: 1 count: 1 ,
flatMapV2 , subscribeOn1_0 , v1: 2 ,
flatMapV3 , subscribeOn2_0 , v1: 1 v2: 1 ,
create2 , subscribeOn2_1 , v1: 2 count: 1 ,
flatMapV3 , subscribeOn2_1 , v1: 2 v2: 1 ,
create2 , subscribeOn2_1 , v1: 2 count: 2 ,
create3 , subscribeOn3_0 , v1: 2 v2: 1 count: 1 ,
flatMapV3 , subscribeOn2_1 , v1: 2 v2: 2 ,
create2 , subscribeOn2_0 , v1: 1 count: 2 ,
flatMapV3 , subscribeOn2_0 , v1: 1 v2: 2 ,
create3 , subscribeOn3_1 , v1: 1 v2: 1 count: 1 ,
create3 , subscribeOn3_3 , v1: 1 v2: 2 count: 1 ,
create3 , subscribeOn3_2 , v1: 2 v2: 2 count: 1 ,
create3 , subscribeOn3_0 , v1: 2 v2: 1 count: 2 ,
create3 , subscribeOn3_1 , v1: 1 v2: 1 count: 2 ,
create3 , subscribeOn3_2 , v1: 2 v2: 2 count: 2 ,
doOnNext3 , observeOn3_1 , v1: 2 v2: 1 , i: 1
create3 , subscribeOn3_3 , v1: 1 v2: 2 count: 2 ,
doOnNext3 , observeOn3_0 , v1: 1 v2: 1 , i: 1
doOnNext3 , observeOn3_3 , v1: 2 v2: 2 , i: 1
doOnNext3 , observeOn3_1 , v1: 2 v2: 1 , i: 2
doOnNext3 , observeOn3_2 , v1: 1 v2: 2 , i: 1
doOnNext3 , observeOn3_3 , v1: 2 v2: 2 , i: 2
doOnNext3 , observeOn3_2 , v1: 1 v2: 2 , i: 2
doOnNext2 , observeOn2_1 , v1: 1 , i: map3 , observeOn3_2 , , 1
doOnNext2 , observeOn2_1 , v1: 1 , i: map3 , observeOn3_2 , , 2
doOnNext , observeOn1_0 , , i: map2 , observeOn2_1 , , map3 , observeOn3_2 , , 1
doOnNext3 , observeOn3_0 , v1: 1 v2: 1 , i: 2
doOnNext2 , observeOn2_2 , v1: 1 , i: map3 , observeOn3_0 , , 1
doOnNext2 , observeOn2_0 , v1: 2 , i: map3 , observeOn3_1 , , 1
doOnNext2 , observeOn2_0 , v1: 2 , i: map3 , observeOn3_3 , , 1
doOnNext2 , observeOn2_0 , v1: 2 , i: map3 , observeOn3_1 , , 2
doOnNext2 , observeOn2_0 , v1: 2 , i: map3 , observeOn3_3 , , 2
doOnNext , observeOn1_0 , , i: map2 , observeOn2_1 , , map3 , observeOn3_2 , , 2
doOnNext , observeOn1_0 , , i: map2 , observeOn2_2 , , map3 , observeOn3_0 , , 1
doOnNext2 , observeOn2_3 , v1: 1 , i: map3 , observeOn3_0 , , 2
doOnNext , observeOn1_0 , , i: map2 , observeOn2_0 , , map3 , observeOn3_1 , , 1
doOnNext , observeOn1_0 , , i: map2 , observeOn2_0 , , map3 , observeOn3_3 , , 1
doOnNext , observeOn1_0 , , i: map2 , observeOn2_0 , , map3 , observeOn3_1 , , 2
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_1 , , map3 , observeOn3_2 , , 1
doOnNext , observeOn1_0 , , i: map2 , observeOn2_0 , , map3 , observeOn3_3 , , 2
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_1 , , map3 , observeOn3_2 , , 2
doOnNext , observeOn1_0 , , i: map2 , observeOn2_3 , , map3 , observeOn3_0 , , 2
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_2 , , map3 , observeOn3_0 , , 1
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_0 , , map3 , observeOn3_1 , , 1
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_0 , , map3 , observeOn3_3 , , 1
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_0 , , map3 , observeOn3_1 , , 2
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_0 , , map3 , observeOn3_3 , , 2
bsubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_3 , , map3 , observeOn3_0 , , 2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment