Skip to content

Instantly share code, notes, and snippets.

@abersnaze
Last active June 28, 2016 00:21
Show Gist options
  • Save abersnaze/c8c37847bc4bddf13dbc7cc75b99ae95 to your computer and use it in GitHub Desktop.
Save abersnaze/c8c37847bc4bddf13dbc7cc75b99ae95 to your computer and use it in GitHub Desktop.
package rx.schedulers;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subscriptions.BooleanSubscription;
public class FineGrainCompositeScheduler extends Scheduler {
private final Scheduler actualScheduler;
private final PublishSubject<Observable<Void>> actionQueue;
public FineGrainCompositeScheduler(Func1<Observable<Observable<Void>>, Observable<Void>> combine, int maxConcurrent,
Scheduler actual) {
this.actualScheduler = actual;
this.actionQueue = PublishSubject.<Observable<Void>>create();
combine.call(actionQueue).subscribe();
}
@Override
public Worker createWorker() {
final Worker actualWorker = actualScheduler.createWorker();
return new Worker() {
@Override
public void unsubscribe() {
actualWorker.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return actualWorker.isUnsubscribed();
}
@Override
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
final long nowBeforeQueued = TimeUnit.MILLISECONDS.convert(actualScheduler.now(), unit);
final BooleanSubscription subscription = new BooleanSubscription();
Observable<Void> queuedAction = Observable.defer(new Func0<Observable<Void>>() {
@Override
public Observable<Void> call() {
if (!subscription.isUnsubscribed()) {
long nowAfterQueued = actualScheduler.now();
long queuingDelay = nowAfterQueued - nowBeforeQueued;
actualWorker.schedule(action, delayTime - queuingDelay, unit);
}
return Observable.empty();
}
});
actionQueue.onNext(queuedAction);
return subscription;
}
@Override
public Subscription schedule(final Action0 action) {
final BooleanSubscription subscription = new BooleanSubscription();
Observable<Void> queuedAction = Observable.defer(new Func0<Observable<Void>>() {
@Override
public Observable<Void> call() {
if (!subscription.isUnsubscribed()) {
actualWorker.schedule(action);
}
return Observable.empty();
}
});
actionQueue.onNext(queuedAction);
return subscription;
}
};
}
}
@abersnaze
Copy link
Author

I concede that I most punted on doing any of the subscription management correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment