Skip to content

Instantly share code, notes, and snippets.

@farukonder
Created December 17, 2018 22:15
Show Gist options
  • Save farukonder/fd316130d3d605bcaebff0cb3742b436 to your computer and use it in GitHub Desktop.
Save farukonder/fd316130d3d605bcaebff0cb3742b436 to your computer and use it in GitHub Desktop.
package com.github.farukonder.experimenting.frp.rxjava2.simple;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class Schedulers2LevelBlocking {
static ExecutorService observeOn_1 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("observeOn1"));
static ExecutorService subscribeOn_1 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("subscribeOn1"));
static ExecutorService observeOn_2 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("observeOn2"));
static ExecutorService subscribeOn_2 = Executors.newFixedThreadPool(5, new WorkerThreadFactory("subscribeOn2"));
static String getlog(String operation,String threadName,String values){
return getlogg(operation, threadName, values, "");
}
static String getlogg(String operation,String threadName,String values,String i){
return padRight(operation, 10) + ", " + padRight(threadName, 15) + ", "+ padRight(values, 25) + ", "+ i.replaceAll("\\s+"," ");
}
static Observable<String> flatMapV2(String threadName, Object v1){
System.out.println(getlog("flatMapV2",threadName,"v1: " + v1));
return Observable.create(emitter -> {
int count = 0;
while (count++<2) {
System.out.println(getlog("create2",Thread.currentThread().getName(),"v1: " + v1 + " count: " + count));
emitter.onNext("" + count);
}
emitter.onComplete();
})
.observeOn(Schedulers.from(observeOn_2))
.subscribeOn(Schedulers.from(subscribeOn_2))
.doOnNext(i -> System.out.println(getlogg("doOnNext2",Thread.currentThread().getName(),"v1: " + v1," i: " + i)))
.map(i -> getlogg("map2",Thread.currentThread().getName(),"",i.toString()));
}
public static void main(String[] args) throws InterruptedException {
Observable.create(s -> {
int count = 0;
while (count++ < 2) {
System.out.println(getlog("create1",Thread.currentThread().getName(),"v1: " + count));
s.onNext(count);
}
s.onComplete();
}).<String>flatMap(v -> flatMapV2(Thread.currentThread().getName(),v))
.observeOn(Schedulers.from(observeOn_1))
.subscribeOn(Schedulers.from(subscribeOn_1))
.doOnNext(i -> System.out.println(getlogg("doOnNext",Thread.currentThread().getName(),""," i: " + i)))
.map(i -> getlogg("map1",Thread.currentThread().getName(),"",i))
.blockingSubscribe
(s -> System.out.println(getlogg("bSubscribe", Thread.currentThread().getName(), "", " s: " + s)));
observeOn_1.shutdown();
subscribeOn_1.shutdown();
observeOn_2.shutdown();
subscribeOn_2.shutdown();
}
public static class WorkerThreadFactory implements ThreadFactory {
private int counter = 0;
private String prefix = "";
public WorkerThreadFactory(String prefix) {
this.prefix = prefix;
}
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "_" + counter++);
}
}
public static String padRight(String s, int n) {
return String.format("%1$-" + n + "s", s);
}
public static String padLeft(String s, int n) {
return String.format("%1$" + n + "s", s);
}
}
create1 , subscribeOn1_0 , v1: 1 ,
flatMapV2 , subscribeOn1_0 , v1: 1 ,
create1 , subscribeOn1_0 , v1: 2 ,
create2 , subscribeOn2_0 , v1: 1 count: 1 ,
flatMapV2 , subscribeOn1_0 , v1: 2 ,
create2 , subscribeOn2_1 , v1: 2 count: 1 ,
create2 , subscribeOn2_0 , v1: 1 count: 2 ,
create2 , subscribeOn2_1 , v1: 2 count: 2 ,
doOnNext2 , observeOn2_1 , v1: 2 , i: 1
doOnNext2 , observeOn2_0 , v1: 1 , i: 1
doOnNext2 , observeOn2_1 , v1: 2 , i: 2
doOnNext2 , observeOn2_0 , v1: 1 , i: 2
doOnNext , observeOn1_0 , , i: map2 , observeOn2_0 , , 1
doOnNext , observeOn1_0 , , i: map2 , observeOn2_1 , , 1
doOnNext , observeOn1_0 , , i: map2 , observeOn2_1 , , 2
doOnNext , observeOn1_0 , , i: map2 , observeOn2_0 , , 2
bSubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_0 , , 1
bSubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_1 , , 1
bSubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_1 , , 2
bSubscribe, main , , s: map1 , observeOn1_0 , , map2 , observeOn2_0 , , 2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment