Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created April 3, 2014 08:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/9950338 to your computer and use it in GitHub Desktop.
Save akarnokd/9950338 to your computer and use it in GitHub Desktop.
package rxjava;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
public abstract class Scheduler {
public abstract int degreeOfParallelism();
public long now() {
return System.currentTimeMillis();
}
public Subscription schedule(Action1<Inner> action) {
Inner inner = createInner();
inner.schedule(action);
return inner;
}
public Subscription schedule(Action1<Inner> action, long delay, TimeUnit unit) {
Inner inner = createInner();
inner.schedule(action, delay, unit);
return inner;
}
public Subscription schedulePeriodically(Action1<Inner> action, long initialDelay, long period, TimeUnit unit) {
Inner inner = createInner();
inner.schedule(new PeriodicAction(action, unit.toNanos(period)), initialDelay, unit);
return inner;
}
private static final class PeriodicAction implements Action1<Inner> {
private final Action1<Inner> actual;
private final long periodInNanos;
public PeriodicAction(Action1<Inner> actual, long periodInNanos) {
this.actual = actual;
this.periodInNanos = periodInNanos;
}
@Override
public void call(Inner t1) {
if (!t1.isUnsubscribed()) {
long start = t1.now();
actual.call(t1);
long delta = t1.now() - start;
long remaining = periodInNanos - TimeUnit.MILLISECONDS.toNanos(delta);
t1.schedule(this, remaining, TimeUnit.NANOSECONDS);
}
}
}
public abstract Inner createInner();
public static abstract class Inner implements Subscription {
protected final ThreadLocal<Action1<Inner>> currentAction = new ThreadLocal<Action1<Inner>>();
protected void setCurrentAction(Action1<Inner> action) {
currentAction.set(action);
}
protected Action1<Inner> getCurrentAction() {
return currentAction.get();
}
protected void clearCurrentAction() {
setCurrentAction(null);
}
public long now() {
return System.currentTimeMillis();
}
public void schedule() {
schedule(0, TimeUnit.NANOSECONDS);
}
public void schedule(long delay, TimeUnit unit) {
Action1<Inner> action = getCurrentAction();
if (action == null) {
throw new IllegalStateException("No current action set.");
}
schedule(action, delay, unit);
}
public void schedule(Action1<Inner> action) {
schedule(action, 0, TimeUnit.NANOSECONDS);
}
public abstract void schedule(Action1<Inner> action, long delay, TimeUnit unit);
}
public static final class NewThreadScheduler extends Scheduler {
@Override
public int degreeOfParallelism() {
return 1;
}
@Override
public Inner createInner() {
return new NewThreadInner();
}
private static final class NewThreadInner extends Inner {
final ScheduledExecutorService executor;
final CompositeSubscription csub;
public NewThreadInner() {
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
csub = new CompositeSubscription();
csub.add(Subscriptions.create(new Action0() {
@Override
public void call() {
executor.shutdown();
}
}));
}
@Override
public void schedule(Action1<Inner> action, long delay, TimeUnit unit) {
DeleteUnsubscribe du = new DeleteUnsubscribe();
csub.add(du);
Subscription ssub = Subscriptions.from(executor.schedule(new ActionRunner(action, du), delay, unit));
du.set(ssub);
}
private final class ActionRunner implements Runnable {
final Action1<Inner> action;
final DeleteUnsubscribe du;
public ActionRunner(Action1<Inner> action, DeleteUnsubscribe du) {
this.action = action;
this.du = du;
}
@Override
public void run() {
du.delete();
csub.remove(du);
setCurrentAction(action);
try {
action.call(NewThreadInner.this);
} finally {
clearCurrentAction();
}
}
}
@Override
public boolean isUnsubscribed() {
return csub.isUnsubscribed();
}
@Override
public void unsubscribe() {
csub.unsubscribe();
}
}
}
static final class DeleteUnsubscribe implements Subscription {
final MultipleAssignmentSubscription outer;
public DeleteUnsubscribe() {
this.outer = new MultipleAssignmentSubscription();
this.outer.set(new MultipleAssignmentSubscription());
}
public void set(Subscription s) {
((MultipleAssignmentSubscription)outer.get()).set(s);
}
public void delete() {
outer.set(Subscriptions.empty());
}
@Override
public boolean isUnsubscribed() {
return outer.isUnsubscribed();
}
@Override
public void unsubscribe() {
outer.unsubscribe();
}
}
public static void main(String[] args) throws Exception {
Scheduler s = new NewThreadScheduler();
final CountDownLatch cdl = new CountDownLatch(5);
s.schedule(new Action1<Inner>() {
@Override
public void call(Inner t1) {
long c = cdl.getCount();
System.out.println(Thread.currentThread() + " " + c);
if (c > 1) {
t1.schedule(100, TimeUnit.MILLISECONDS);
}
cdl.countDown();
}
}, 100, TimeUnit.MILLISECONDS);
cdl.await();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment