Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created November 5, 2015 10:05
Show Gist options
  • Save akarnokd/afc689617afbcedaa2b1 to your computer and use it in GitHub Desktop.
Save akarnokd/afc689617afbcedaa2b1 to your computer and use it in GitHub Desktop.
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.Observable.Operator;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.*;
import rx.subscriptions.*;
public class DynamicWorkCount {
public static void main(String[] args) throws Exception {
BehaviorSubject<Integer> workerCount = BehaviorSubject.create(2);
PublishSubject<Integer> work = PublishSubject.create();
work.lift(new DynamicConcurrentMergeOperator<>(v ->
Observable.just(v + 10)
.delay(500, TimeUnit.MILLISECONDS, Schedulers.io())
.map(n -> Thread.currentThread() + ": " + n)
, workerCount))
.onBackpressureBuffer()
.subscribe(System.out::println, Throwable::printStackTrace);
work.onNext(1);
work.onNext(2);
work.onNext(3);
work.onNext(4);
work.onNext(5);
work.onNext(6);
Thread.sleep(2000);
workerCount.onNext(5);
work.onNext(-1);
work.onNext(-2);
work.onNext(-3);
work.onNext(-4);
work.onNext(-5);
work.onNext(-6);
Thread.sleep(1500);
}
static final class DynamicConcurrentMergeOperator<T, U, R> implements Operator<R, T> {
final Func1<? super T, ? extends Observable<? extends R>> mapper;
final Observable<Integer> workerCount;
public DynamicConcurrentMergeOperator(
Func1<? super T, ? extends Observable<? extends R>> mapper,
Observable<Integer> workerCount) {
this.mapper = mapper;
this.workerCount = workerCount;
}
@Override
public Subscriber<? super T> call(Subscriber<? super R> t) {
DynamicConcurrentMerge<T, R> parent = new DynamicConcurrentMerge<>(t, mapper);
t.add(parent);
parent.init(workerCount);
return parent;
}
}
static final class DynamicConcurrentMerge<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends Observable<? extends R>> mapper;
final Queue<T> queue;
final CopyOnWriteArrayList<DynamicWorker<T, R>> workers;
final CompositeSubscription composite;
final AtomicInteger wipActive;
final AtomicBoolean once;
long id;
public DynamicConcurrentMerge(Subscriber<? super R> actual,
Func1<? super T, ? extends Observable<? extends R>> mapper) {
this.actual = actual;
this.mapper = mapper;
this.queue = new ConcurrentLinkedQueue<>();
this.workers = new CopyOnWriteArrayList<>();
this.composite = new CompositeSubscription();
this.wipActive = new AtomicInteger(1);
this.once = new AtomicBoolean();
this.add(composite);
this.request(0);
}
public void init(Observable<Integer> workerCount) {
Subscription wc = workerCount.subscribe(n -> {
int n0 = workers.size();
if (n0 < n) {
for (int i = n0; i < n; i++) {
DynamicWorker<T, R> dw = new DynamicWorker<>(++id, this);
workers.add(dw);
request(1);
dw.tryNext();
}
} else
if (n0 > n) {
for (int i = 0; i < n; i++) {
workers.get(i).start();
}
for (int i = n; n < n0; i++) {
workers.get(i).stop();
}
}
if (!once.get() && once.compareAndSet(false, true)) {
request(n);
}
}, this::onError);
composite.add(wc);
}
void requestMore(long n) {
request(n);
}
@Override
public void onNext(T t) {
queue.offer(t);
wipActive.getAndIncrement();
for (DynamicWorker<T, R> w : workers) {
w.tryNext();
}
}
@Override
public void onError(Throwable e) {
composite.unsubscribe();
actual.onError(e);
}
@Override
public void onCompleted() {
if (wipActive.decrementAndGet() == 0) {
actual.onCompleted();
}
}
}
static final class DynamicWorker<T, R> {
final long id;
final AtomicBoolean running;
final DynamicConcurrentMerge<T, R> parent;
final AtomicBoolean stop;
public DynamicWorker(long id, DynamicConcurrentMerge<T, R> parent) {
this.id = id;
this.parent = parent;
this.stop = new AtomicBoolean();
this.running = new AtomicBoolean();
}
public void tryNext() {
if (!running.get() && running.compareAndSet(false, true)) {
T t;
if (stop.get()) {
parent.workers.remove(this);
return;
}
t = parent.queue.poll();
if (t == null) {
running.set(false);
return;
}
Observable<? extends R> out = parent.mapper.call(t);
Subscriber<R> s = new Subscriber<R>() {
@Override
public void onNext(R t) {
parent.actual.onNext(t);
}
@Override
public void onError(Throwable e) {
parent.onError(e);
}
@Override
public void onCompleted() {
parent.onCompleted();
if (parent.wipActive.get() != 0) {
running.set(false);
parent.requestMore(1);
tryNext();
}
}
};
parent.composite.add(s);
s.add(Subscriptions.create(() -> {
parent.composite.remove(s);
}));
out.subscribe(s);
}
}
public void start() {
stop.set(false);
tryNext();
}
public void stop() {
stop.set(true);
if (running.compareAndSet(false, true)) {
parent.workers.remove(this);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment