Skip to content

Instantly share code, notes, and snippets.

@yshrsmz
Created December 19, 2016 08:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yshrsmz/e855b0447814c2737857397cb7880585 to your computer and use it in GitHub Desktop.
Save yshrsmz/e855b0447814c2737857397cb7880585 to your computer and use it in GitHub Desktop.
0-1
2-1
1-1
4-1
7-1
7-2
10-1
5-1
5-2
8-1
8-2
11-1
11-2
14-1
14-2
17-1
17-2
20-1
20-2
23-1
23-2
26-1
26-2
29-1
29-2
32-1
32-2
35-1
35-2
38-1
38-2
38-2 - disposeAction: thread is different - resourceFactory: RxIoScheduler-5, disposeAction: SingleThreadScheduler-3-1
41-1
41-2
44-1
44-2
47-1
47-2
50-1
50-2
53-1
53-2
56-1
56-2
59-1
13-1
13-2
16-1
16-2
19-1
19-2
22-1
22-2
25-1
25-2
28-1
28-2
28-2 - disposeAction: thread is different - resourceFactory: RxIoScheduler-5, disposeAction: SingleThreadScheduler-2-1
31-1
31-2
34-1
34-2
37-1
37-2
40-1
40-2
43-1
43-2
46-1
46-2
46-2 - disposeAction: thread is different - resourceFactory: RxIoScheduler-5, disposeAction: SingleThreadScheduler-2-1
49-1
49-2
52-1
52-2
55-1
55-2
58-1
58-2
61-1
61-2
64-1
64-2
67-1
67-2
67-2 - disposeAction: thread is different - resourceFactory: RxIoScheduler-5, disposeAction: SingleThreadScheduler-2-1
70-1
59-2
73-1
59-3
67-3
76-1
79-1
82-1
85-1
3-1
6-1
59-2 - disposeAction: thread is different - resourceFactory: RxIoScheduler-2, disposeAction: SingleThreadScheduler-3-1
62-1
65-1
85-2
9-1
88-1
12-1
15-1
18-1
21-1
24-1
24-2
27-1
30-1
33-1
36-1
36-2
39-1
42-1
68-1
91-1
71-1
94-1
74-1
97-1
42-2
42-3
74-2
74-3
45-1
77-1
80-1
83-1
86-1
86-2
45-2
45-3
48-1
51-1
54-1
57-1
57-2
57-3
60-1
60-2
60-3
63-1
66-1
69-1
69-2
69-3
72-1
72-2
72-3
75-1
78-1
81-1
81-2
81-3
84-1
84-2
84-3
87-1
87-2
87-3
90-1
90-2
90-3
93-1
93-2
93-3
96-1
96-2
96-3
99-1
89-1
99-2
99-3
92-1
95-1
95-2
98-1
98-2
package net.yslibrary.rxusingsample.filtersiwtchifemptytest;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.internal.util.RxThreadFactory;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by yshrsmz on 16/12/19.
*/
public class Test2 {
public static void main(String[] args) throws Exception {
Subscription subscription = null;
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
if (subscription != null) {
subscription.unsubscribe();
}
int finalI = i;
subscription = Using.get(finalI + "-1") // 1
.filter(s -> false)
.switchIfEmpty(Using.get(finalI + "-2") // 2
.subscribeOn(Schedulers.io())
.flatMap(s -> Using.get(finalI + "-3"))) //3
.compose(SingleThreadSchedulerPool.doInSingleThread())
.doOnNext(s -> latch.countDown())
.subscribe();
}
latch.await();
subscription.unsubscribe();
}
static class Using {
public static Observable<String> get(String prefix) {
return Observable.using(() -> {
System.out.println(prefix);
String threadName = Thread.currentThread().getName();
return threadName;
}, s -> {
return Observable.fromCallable(() -> {
return s;
});
}, s -> {
String disposeThread = Thread.currentThread().getName();
if (s.equals(disposeThread)) {
return;
}
System.out.println(prefix + " - disposeAction: thread is different - resourceFactory: " + s + ", disposeAction: " + disposeThread);
});
}
}
static class SingleThreadSchedulerPool {
private final static List<Scheduler> SCHEDULERS = new ArrayList<>();
private final static AtomicLong COUNT = new AtomicLong(0L);
static {
SCHEDULERS.add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory("SingleThreadScheduler-1-"))));
SCHEDULERS.add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory("SingleThreadScheduler-2-"))));
SCHEDULERS.add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory("SingleThreadScheduler-3-"))));
}
private SingleThreadSchedulerPool() {
// no-op
}
public static Scheduler get() {
long current = COUNT.getAndIncrement();
return SCHEDULERS.get((int) (current % 3)); // 3 is the size of SCHEDULERS
}
public static <T> Observable.Transformer<T, T> doInSingleThread() {
return tObservable -> {
Scheduler s = get();
return tObservable
.subscribeOn(s)
.unsubscribeOn(s);
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment