Created
November 5, 2015 10:05
-
-
Save akarnokd/afc689617afbcedaa2b1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Queue; | |
import java.util.concurrent.*; | |
import java.util.concurrent.atomic.*; | |
import rx.*; | |
import rx.Observable.Operator; | |
import rx.functions.Func1; | |
import rx.schedulers.Schedulers; | |
import rx.subjects.*; | |
import rx.subscriptions.*; | |
public class DynamicWorkCount { | |
public static void main(String[] args) throws Exception { | |
BehaviorSubject<Integer> workerCount = BehaviorSubject.create(2); | |
PublishSubject<Integer> work = PublishSubject.create(); | |
work.lift(new DynamicConcurrentMergeOperator<>(v -> | |
Observable.just(v + 10) | |
.delay(500, TimeUnit.MILLISECONDS, Schedulers.io()) | |
.map(n -> Thread.currentThread() + ": " + n) | |
, workerCount)) | |
.onBackpressureBuffer() | |
.subscribe(System.out::println, Throwable::printStackTrace); | |
work.onNext(1); | |
work.onNext(2); | |
work.onNext(3); | |
work.onNext(4); | |
work.onNext(5); | |
work.onNext(6); | |
Thread.sleep(2000); | |
workerCount.onNext(5); | |
work.onNext(-1); | |
work.onNext(-2); | |
work.onNext(-3); | |
work.onNext(-4); | |
work.onNext(-5); | |
work.onNext(-6); | |
Thread.sleep(1500); | |
} | |
static final class DynamicConcurrentMergeOperator<T, U, R> implements Operator<R, T> { | |
final Func1<? super T, ? extends Observable<? extends R>> mapper; | |
final Observable<Integer> workerCount; | |
public DynamicConcurrentMergeOperator( | |
Func1<? super T, ? extends Observable<? extends R>> mapper, | |
Observable<Integer> workerCount) { | |
this.mapper = mapper; | |
this.workerCount = workerCount; | |
} | |
@Override | |
public Subscriber<? super T> call(Subscriber<? super R> t) { | |
DynamicConcurrentMerge<T, R> parent = new DynamicConcurrentMerge<>(t, mapper); | |
t.add(parent); | |
parent.init(workerCount); | |
return parent; | |
} | |
} | |
static final class DynamicConcurrentMerge<T, R> extends Subscriber<T> { | |
final Subscriber<? super R> actual; | |
final Func1<? super T, ? extends Observable<? extends R>> mapper; | |
final Queue<T> queue; | |
final CopyOnWriteArrayList<DynamicWorker<T, R>> workers; | |
final CompositeSubscription composite; | |
final AtomicInteger wipActive; | |
final AtomicBoolean once; | |
long id; | |
public DynamicConcurrentMerge(Subscriber<? super R> actual, | |
Func1<? super T, ? extends Observable<? extends R>> mapper) { | |
this.actual = actual; | |
this.mapper = mapper; | |
this.queue = new ConcurrentLinkedQueue<>(); | |
this.workers = new CopyOnWriteArrayList<>(); | |
this.composite = new CompositeSubscription(); | |
this.wipActive = new AtomicInteger(1); | |
this.once = new AtomicBoolean(); | |
this.add(composite); | |
this.request(0); | |
} | |
public void init(Observable<Integer> workerCount) { | |
Subscription wc = workerCount.subscribe(n -> { | |
int n0 = workers.size(); | |
if (n0 < n) { | |
for (int i = n0; i < n; i++) { | |
DynamicWorker<T, R> dw = new DynamicWorker<>(++id, this); | |
workers.add(dw); | |
request(1); | |
dw.tryNext(); | |
} | |
} else | |
if (n0 > n) { | |
for (int i = 0; i < n; i++) { | |
workers.get(i).start(); | |
} | |
for (int i = n; n < n0; i++) { | |
workers.get(i).stop(); | |
} | |
} | |
if (!once.get() && once.compareAndSet(false, true)) { | |
request(n); | |
} | |
}, this::onError); | |
composite.add(wc); | |
} | |
void requestMore(long n) { | |
request(n); | |
} | |
@Override | |
public void onNext(T t) { | |
queue.offer(t); | |
wipActive.getAndIncrement(); | |
for (DynamicWorker<T, R> w : workers) { | |
w.tryNext(); | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
composite.unsubscribe(); | |
actual.onError(e); | |
} | |
@Override | |
public void onCompleted() { | |
if (wipActive.decrementAndGet() == 0) { | |
actual.onCompleted(); | |
} | |
} | |
} | |
static final class DynamicWorker<T, R> { | |
final long id; | |
final AtomicBoolean running; | |
final DynamicConcurrentMerge<T, R> parent; | |
final AtomicBoolean stop; | |
public DynamicWorker(long id, DynamicConcurrentMerge<T, R> parent) { | |
this.id = id; | |
this.parent = parent; | |
this.stop = new AtomicBoolean(); | |
this.running = new AtomicBoolean(); | |
} | |
public void tryNext() { | |
if (!running.get() && running.compareAndSet(false, true)) { | |
T t; | |
if (stop.get()) { | |
parent.workers.remove(this); | |
return; | |
} | |
t = parent.queue.poll(); | |
if (t == null) { | |
running.set(false); | |
return; | |
} | |
Observable<? extends R> out = parent.mapper.call(t); | |
Subscriber<R> s = new Subscriber<R>() { | |
@Override | |
public void onNext(R t) { | |
parent.actual.onNext(t); | |
} | |
@Override | |
public void onError(Throwable e) { | |
parent.onError(e); | |
} | |
@Override | |
public void onCompleted() { | |
parent.onCompleted(); | |
if (parent.wipActive.get() != 0) { | |
running.set(false); | |
parent.requestMore(1); | |
tryNext(); | |
} | |
} | |
}; | |
parent.composite.add(s); | |
s.add(Subscriptions.create(() -> { | |
parent.composite.remove(s); | |
})); | |
out.subscribe(s); | |
} | |
} | |
public void start() { | |
stop.set(false); | |
tryNext(); | |
} | |
public void stop() { | |
stop.set(true); | |
if (running.compareAndSet(false, true)) { | |
parent.workers.remove(this); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment