Skip to content

Instantly share code, notes, and snippets.

@abersnaze
Last active June 27, 2016 21:47
Show Gist options
  • Save abersnaze/07d995f63ed6100ef60bd9ac980d59ed to your computer and use it in GitHub Desktop.
Save abersnaze/07d995f63ed6100ef60bd9ac980d59ed to your computer and use it in GitHub Desktop.
package rx.schedulers;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
public class CoarseGrainCompositeScheduler extends Scheduler {
private final Scheduler actualScheduler;
private final PublishSubject<Observable<Void>> workerQueue;
public CoarseGrainCompositeScheduler(Func1<Observable<Observable<Void>>, Observable<Void>> combine, int maxConcurrent,
Scheduler actualScheduler) {
this.actualScheduler = actualScheduler;
this.workerQueue = PublishSubject.<Observable<Void>>create();
combine.call(workerQueue).subscribe();
}
private static final Subscription SUBSCRIBED = new Subscription() {
@Override
public void unsubscribe() {
}
@Override
public boolean isUnsubscribed() {
return false;
}
};
private static final Subscription UNSUBSCRIBED = Subscriptions.unsubscribed();
@SuppressWarnings("serial")
private static abstract class ScheduledAction extends AtomicReference<Subscription> implements Subscription {
public final void call(Worker actualWorker) {
Subscription oldState = get();
// either SUBSCRIBED or UNSUBSCRIBED
if (oldState == UNSUBSCRIBED) {
// no need to schedule return
return;
}
if (oldState != SUBSCRIBED) {
// has already been scheduled return
// should not be able to get here but handle it anyway.
return;
}
Subscription newState = callActual(actualWorker);
if (!compareAndSet(SUBSCRIBED, newState)) {
// set would only fail if the new current state is either
// UNSUBSCRIBED or some other subscription from a concurrent
// call to this method. Unsubscribe from the action just
// scheduled.
newState.unsubscribe();
}
}
abstract Subscription callActual(Worker actualWorker);
@Override
public boolean isUnsubscribed() {
return get().isUnsubscribed();
}
@Override
public void unsubscribe() {
Subscription oldState;
// no matter what the current state is the new state is going to be
Subscription newState = UNSUBSCRIBED;
do {
oldState = get();
if (oldState == UNSUBSCRIBED) {
// the action has already been unsubscribed
return;
}
} while (!compareAndSet(oldState, newState));
if (oldState != SUBSCRIBED) {
// the action was scheduled. stop it.
oldState.unsubscribe();
}
}
}
@SuppressWarnings("serial")
private static class ImmediateAction extends ScheduledAction {
private final Action0 action;
public ImmediateAction(Action0 action) {
this.action = action;
}
@Override
public Subscription callActual(Worker actualWorker) {
return actualWorker.schedule(action);
}
}
@SuppressWarnings("serial")
private static class DelayedAction extends ScheduledAction {
private final Action0 action;
private final long delayTime;
private final TimeUnit unit;
public DelayedAction(Action0 action, long delayTime, TimeUnit unit) {
this.action = action;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public Subscription callActual(Worker actualWorker) {
return actualWorker.schedule(action, delayTime, unit);
}
}
@Override
public Worker createWorker() {
// a queue for the actions submitted while worker is waiting to get to
// the front of the workerQueue.
final ReplaySubject<ScheduledAction> actionQueue = ReplaySubject.create();
// complete the actionQueue when worker is unsubscribed to make room
// for the next worker in the workerQueue.
final Subscription subscription = BooleanSubscription.create(new Action0() {
@Override
public void call() {
actionQueue.onCompleted();
}
});
// add to the worker queue.
workerQueue.onNext(Observable.defer(new Func0<Observable<Void>>() {
@Override
public Observable<Void> call() {
final Worker actualWorker = actualScheduler.createWorker();
// pull from the actionQueue and start scheduling on the actual
// worker.
return actionQueue.map(new Func1<ScheduledAction, Void>() {
@Override
public Void call(ScheduledAction action) {
action.call(actualWorker);
return null;
}
});
}
}).ignoreElements());
return new Worker() {
@Override
public void unsubscribe() {
subscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return subscription.isUnsubscribed();
}
@Override
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
// send a scheduled action to the actionQueue
DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
actionQueue.onNext(delayedAction);
return delayedAction;
}
@Override
public Subscription schedule(final Action0 action) {
// send a scheduled action to the actionQueue
ImmediateAction immediateAction = new ImmediateAction(action);
actionQueue.onNext(immediateAction);
return immediateAction;
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment