Skip to content

Instantly share code, notes, and snippets.

@yshrsmz
Last active December 8, 2016 05:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yshrsmz/d586c319ab8cfb47230b66affdc80810 to your computer and use it in GitHub Desktop.
Save yshrsmz/d586c319ab8cfb47230b66affdc80810 to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.internal.util.RxThreadFactory;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class UsingComplexTest {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(100);
Subscription subscription = null;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
if (subscription != null) {
subscription.unsubscribe();
}
String prefix = "test" + i + "-";
subscription = Using.get(prefix + "1")
.flatMap(s -> {
return Using.get(prefix + "2")
.filter(s1 -> false)
.switchIfEmpty(Using.get(prefix + "3")
.compose(SingleThreadSchedulerPool.doInSingleThread())
.flatMap(s2 -> Using.get(prefix + "4")));
}).subscribeOn(Schedulers.io())
.doOnUnsubscribe(() -> latch.countDown())
.subscribe();
}
latch.await();
}
static class Using {
public static Observable<String> get(String prefix) {
return Observable.using(() -> {
System.out.println(prefix);
String threadName = Thread.currentThread().getName();
return threadName;
}, s -> {
return Observable.fromCallable(() -> {
return s;
});
}, s -> {
String disposeThread = Thread.currentThread().getName();
if (s.equals(disposeThread)) {
return;
}
System.out.println(prefix + " - disposeAction: thread is different - resourceFactory: " + s + ", disposeAction: " + disposeThread);
});
}
}
static class SingleThreadSchedulerPool {
private final static List<Scheduler> SCHEDULERS = new ArrayList<>();
private final static AtomicLong COUNT = new AtomicLong(0L);
static {
SCHEDULERS.add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory("SingleThreadScheduler-1-"))));
SCHEDULERS.add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory("SingleThreadScheduler-2-"))));
SCHEDULERS.add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory("SingleThreadScheduler-3-"))));
}
private SingleThreadSchedulerPool() {
// no-op
}
public static Scheduler get() {
long current = COUNT.getAndIncrement();
return SCHEDULERS.get((int) (current % 3)); // 3 is the size of SCHEDULERS
}
public static <T> Observable.Transformer<T, T> doInSingleThread() {
return tObservable -> {
Scheduler s = get();
return tObservable
.subscribeOn(s)
.unsubscribeOn(s);
};
}
}
}
/usr/lib/jvm/java-8-oracle/bin/java -Didea.launcher.port=7545 -Didea.launcher.bin.path=/opt/idea-IU/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-8-oracle/jre/lib/charsets.jar:/usr/lib/jvm/java-8-oracle/jre/lib/deploy.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jfxrt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-8-oracle/jre/lib/javaws.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jce.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfr.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfxswt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jsse.jar:/usr/lib/jvm/java-8-oracle/jre/lib/management-agent.jar:/usr/lib/jvm/java-8-oracle/jre/lib/plugin.jar:/usr/lib/jvm/java-8-oracle/jre/lib/resources.jar:/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar:/home/yshrsmz/repos/rxusingsample/build/classes/main:/home/yshrsmz/.gradle/caches/modules-2/files-2.1/io.reactivex/rxjava/1.2.3/7fe1a94c1aeb958acc876fe616922cc191f3222c/rxjava-1.2.3.jar:/opt/idea-IU/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain net.yslibrary.rxusingsample.usingcomplextest.UsingComplexTest
test0-1
test1-1
test1-1 - disposeAction: thread is different - resourceFactory: RxIoScheduler-3, disposeAction: main
test2-1
test2-1 - disposeAction: thread is different - resourceFactory: RxIoScheduler-2, disposeAction: main
test3-1
test3-2
test2-2
test1-2
test3-1 - disposeAction: thread is different - resourceFactory: RxIoScheduler-4, disposeAction: main
test4-1
test4-2
test4-3
test4-4
test5-1
test5-2
test5-3
test5-4
test6-1
test6-2
test6-3
test6-4
test7-1
test7-2
test7-3
test7-4
test8-1
test8-2
test8-3
test8-4
test9-1
test9-2
test9-3
test9-4
test10-1
test10-2
test10-3
test10-4
test11-1
test11-2
test11-3
test11-4
test12-1
test12-2
test12-3
test12-4
test13-1
test13-2
test13-3
test13-4
test14-1
test14-2
test14-3
test14-4
test15-1
test15-2
test15-3
test15-4
test16-1
test16-2
test16-3
test16-4
test17-1
test17-2
test17-3
test17-4
test18-1
test18-2
test18-3
test18-4
test19-1
test19-2
test19-3
test19-4
test20-1
test20-2
test20-3
test20-4
test21-1
test21-2
test21-3
test21-4
test22-1
test22-2
test22-3
test22-4
test23-1
test23-2
test23-3
test23-4
test24-1
test24-2
test24-3
test24-4
test25-1
test25-2
test25-3
test25-4
test26-1
test26-2
test26-3
test26-4
test27-1
test27-2
test27-3
test27-4
test28-1
test28-2
test28-3
test28-4
test29-1
test29-2
test29-3
test29-4
test30-1
test30-2
test30-3
test30-4
test31-1
test31-2
test31-3
test31-4
test32-1
test32-2
test32-3
test32-4
test33-1
test33-2
test33-3
test33-4
test34-1
test34-2
test34-3
test34-4
test35-1
test35-2
test35-3
test35-4
test36-1
test36-2
test36-3
test36-4
test37-1
test37-2
test37-3
test37-4
test38-1
test38-2
test38-3
test38-4
test39-1
test39-2
test39-3
test39-4
test40-1
test40-2
test40-3
test40-4
test41-1
test41-2
test41-3
test41-4
test42-1
test42-2
test42-3
test42-4
test43-1
test43-2
test43-3
test43-4
test44-1
test44-2
test44-3
test44-4
test45-1
test45-2
test45-3
test45-4
test46-1
test46-2
test46-3
test46-4
test47-1
test47-2
test47-3
test47-4
test48-1
test48-2
test48-3
test48-4
test49-1
test49-2
test49-3
test49-4
test50-1
test50-2
test50-3
test50-4
test51-1
test51-2
test51-3
test51-4
test52-1
test52-2
test52-3
test52-4
test53-1
test53-2
test53-3
test53-4
test54-1
test54-2
test54-3
test54-4
test55-1
test55-2
test55-3
test55-4
test56-1
test56-2
test56-3
test56-4
test57-1
test57-2
test57-3
test57-4
test58-1
test58-2
test58-3
test58-4
test59-1
test59-2
test59-3
test59-4
test60-1
test60-2
test60-3
test60-4
test61-1
test61-2
test61-3
test61-4
test62-1
test62-2
test62-3
test62-4
test63-1
test63-2
test63-3
test63-4
test64-1
test64-2
test64-3
test64-4
test65-1
test65-2
test65-3
test65-4
test66-1
test66-2
test66-3
test66-4
test67-1
test67-2
test67-3
test67-4
test68-1
test68-2
test68-3
test68-4
test69-1
test69-2
test69-3
test69-4
test70-1
test70-2
test70-3
test70-4
test71-1
test71-2
test71-3
test71-4
test72-1
test72-2
test72-3
test72-4
test73-1
test73-2
test73-3
test73-4
test74-1
test74-2
test74-3
test74-4
test75-1
test75-2
test75-3
test75-4
test76-1
test76-2
test76-3
test76-4
test77-1
test77-2
test77-3
test77-4
test78-1
test78-2
test78-3
test78-4
test79-1
test79-2
test79-3
test79-4
test80-1
test80-2
test80-3
test80-4
test81-1
test81-2
test81-3
test81-4
test82-1
test82-2
test82-3
test82-4
test83-1
test83-2
test83-3
test83-4
test84-1
test84-2
test84-3
test84-4
test85-1
test85-2
test85-3
test85-4
test86-1
test86-2
test86-3
test86-4
test87-1
test87-2
test87-3
test87-4
test88-1
test88-2
test88-3
test88-4
test89-1
test89-2
test89-3
test89-4
test90-1
test90-2
test90-3
test90-4
test91-1
test91-2
test91-3
test91-4
test92-1
test92-2
test92-3
test92-4
test93-1
test93-2
test93-3
test93-4
test94-1
test94-2
test94-3
test94-4
test95-1
test95-2
test95-3
test95-4
test96-1
test96-2
test96-3
test96-4
test97-1
test97-2
test97-3
test97-4
test98-1
test98-2
test98-3
test98-4
test99-1
test99-2
test99-3
test99-4
Process finished with exit code 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment