Created
April 3, 2014 08:08
-
-
Save akarnokd/9950338 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rxjava; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.TimeUnit; | |
import rx.Subscription; | |
import rx.functions.Action0; | |
import rx.functions.Action1; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.MultipleAssignmentSubscription; | |
import rx.subscriptions.Subscriptions; | |
public abstract class Scheduler { | |
public abstract int degreeOfParallelism(); | |
public long now() { | |
return System.currentTimeMillis(); | |
} | |
public Subscription schedule(Action1<Inner> action) { | |
Inner inner = createInner(); | |
inner.schedule(action); | |
return inner; | |
} | |
public Subscription schedule(Action1<Inner> action, long delay, TimeUnit unit) { | |
Inner inner = createInner(); | |
inner.schedule(action, delay, unit); | |
return inner; | |
} | |
public Subscription schedulePeriodically(Action1<Inner> action, long initialDelay, long period, TimeUnit unit) { | |
Inner inner = createInner(); | |
inner.schedule(new PeriodicAction(action, unit.toNanos(period)), initialDelay, unit); | |
return inner; | |
} | |
private static final class PeriodicAction implements Action1<Inner> { | |
private final Action1<Inner> actual; | |
private final long periodInNanos; | |
public PeriodicAction(Action1<Inner> actual, long periodInNanos) { | |
this.actual = actual; | |
this.periodInNanos = periodInNanos; | |
} | |
@Override | |
public void call(Inner t1) { | |
if (!t1.isUnsubscribed()) { | |
long start = t1.now(); | |
actual.call(t1); | |
long delta = t1.now() - start; | |
long remaining = periodInNanos - TimeUnit.MILLISECONDS.toNanos(delta); | |
t1.schedule(this, remaining, TimeUnit.NANOSECONDS); | |
} | |
} | |
} | |
public abstract Inner createInner(); | |
public static abstract class Inner implements Subscription { | |
protected final ThreadLocal<Action1<Inner>> currentAction = new ThreadLocal<Action1<Inner>>(); | |
protected void setCurrentAction(Action1<Inner> action) { | |
currentAction.set(action); | |
} | |
protected Action1<Inner> getCurrentAction() { | |
return currentAction.get(); | |
} | |
protected void clearCurrentAction() { | |
setCurrentAction(null); | |
} | |
public long now() { | |
return System.currentTimeMillis(); | |
} | |
public void schedule() { | |
schedule(0, TimeUnit.NANOSECONDS); | |
} | |
public void schedule(long delay, TimeUnit unit) { | |
Action1<Inner> action = getCurrentAction(); | |
if (action == null) { | |
throw new IllegalStateException("No current action set."); | |
} | |
schedule(action, delay, unit); | |
} | |
public void schedule(Action1<Inner> action) { | |
schedule(action, 0, TimeUnit.NANOSECONDS); | |
} | |
public abstract void schedule(Action1<Inner> action, long delay, TimeUnit unit); | |
} | |
public static final class NewThreadScheduler extends Scheduler { | |
@Override | |
public int degreeOfParallelism() { | |
return 1; | |
} | |
@Override | |
public Inner createInner() { | |
return new NewThreadInner(); | |
} | |
private static final class NewThreadInner extends Inner { | |
final ScheduledExecutorService executor; | |
final CompositeSubscription csub; | |
public NewThreadInner() { | |
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { | |
@Override | |
public Thread newThread(Runnable r) { | |
Thread t = new Thread(r); | |
t.setDaemon(true); | |
return t; | |
} | |
}); | |
csub = new CompositeSubscription(); | |
csub.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
executor.shutdown(); | |
} | |
})); | |
} | |
@Override | |
public void schedule(Action1<Inner> action, long delay, TimeUnit unit) { | |
DeleteUnsubscribe du = new DeleteUnsubscribe(); | |
csub.add(du); | |
Subscription ssub = Subscriptions.from(executor.schedule(new ActionRunner(action, du), delay, unit)); | |
du.set(ssub); | |
} | |
private final class ActionRunner implements Runnable { | |
final Action1<Inner> action; | |
final DeleteUnsubscribe du; | |
public ActionRunner(Action1<Inner> action, DeleteUnsubscribe du) { | |
this.action = action; | |
this.du = du; | |
} | |
@Override | |
public void run() { | |
du.delete(); | |
csub.remove(du); | |
setCurrentAction(action); | |
try { | |
action.call(NewThreadInner.this); | |
} finally { | |
clearCurrentAction(); | |
} | |
} | |
} | |
@Override | |
public boolean isUnsubscribed() { | |
return csub.isUnsubscribed(); | |
} | |
@Override | |
public void unsubscribe() { | |
csub.unsubscribe(); | |
} | |
} | |
} | |
static final class DeleteUnsubscribe implements Subscription { | |
final MultipleAssignmentSubscription outer; | |
public DeleteUnsubscribe() { | |
this.outer = new MultipleAssignmentSubscription(); | |
this.outer.set(new MultipleAssignmentSubscription()); | |
} | |
public void set(Subscription s) { | |
((MultipleAssignmentSubscription)outer.get()).set(s); | |
} | |
public void delete() { | |
outer.set(Subscriptions.empty()); | |
} | |
@Override | |
public boolean isUnsubscribed() { | |
return outer.isUnsubscribed(); | |
} | |
@Override | |
public void unsubscribe() { | |
outer.unsubscribe(); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Scheduler s = new NewThreadScheduler(); | |
final CountDownLatch cdl = new CountDownLatch(5); | |
s.schedule(new Action1<Inner>() { | |
@Override | |
public void call(Inner t1) { | |
long c = cdl.getCount(); | |
System.out.println(Thread.currentThread() + " " + c); | |
if (c > 1) { | |
t1.schedule(100, TimeUnit.MILLISECONDS); | |
} | |
cdl.countDown(); | |
} | |
}, 100, TimeUnit.MILLISECONDS); | |
cdl.await(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment