Skip to content

Instantly share code, notes, and snippets.

@kaedea
Created October 19, 2016 08:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kaedea/0a25ef586301da3bd26e8de1ca4de053 to your computer and use it in GitHub Desktop.
Save kaedea/0a25ef586301da3bd26e8de1ca4de053 to your computer and use it in GitHub Desktop.
Task dispatcher.
/**
* Task dispatcher interface.
* Using {@linkplain Maker} to build instance in a simple way.
*
* @author kaede
* @version date 16/10/19
*/
public interface Dispatcher {
String TAG = "task.dispatcher";
/**
* Attach an existing scheduler to dispatcher, so that we don't need to create an new one.
*/
Dispatcher attach(Handler scheduler);
/**
* Start the dispatcher before it can work.
*/
void start();
/**
* Whether or not the dispatcher is working.
*/
boolean isRunning();
/**
* Add an task to the dispatcher.
* Note that you should call {@linkplain #start()} before this.
*/
void add(Runnable runnable);
/**
* Schedule an task to the dispatcher.
* Note that you should call {@linkplain #start()} before this.
*/
void schedule(Runnable runnable, long millis);
/**
* When a task is finished, it will call this method.
* Note that you should not call this method directly.
*/
void finish();
/**
* Terminate the dispatcher.
*/
void shutdown();
/**
* Maker utility for {@linkplain Dispatcher}
*/
class Maker {
public static ExecutorDispatcher newExecutorDispatcher(int threadPoolSize) {
return new ExecutorDispatcher(threadPoolSize);
}
public static ExecutorDispatcher newExecutorDispatcher(int threadPoolSize, int capacity) {
return new ExecutorDispatcher(threadPoolSize, capacity);
}
public static ThreadDispatcher newThreadDispatcher(int threadPoolSize) {
return new ThreadDispatcher(threadPoolSize);
}
public static ThreadDispatcher newThreadDispatcher(int threadPoolSize, int capacity) {
return new ThreadDispatcher(threadPoolSize, capacity);
}
}
}
/**
* @author kaede
* @version date 16/10/19
*/
public class DispatcherApiTest extends InstrumentationTestCase {
private Context mContext;
@Override
protected void setUp() throws Exception {
super.setUp();
mContext = getInstrumentation().getTargetContext();
}
public void testExecutor() throws InterruptedException {
int size = 20;
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size);
dispatcher.start();
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < size; i++) {
dispatcher.add(new TestTask(i, count));
}
Thread.sleep(2000);
assertTrue(count.get() == size);
}
public void testExecutorSchedule() throws InterruptedException {
int size = 20;
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size);
dispatcher.start();
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < size/2; i++) {
dispatcher.add(new TestTask(i, count));
}
for (int i = 0; i < size/2; i++) {
dispatcher.schedule(new TestTask(i, count), 3000);
}
Thread.sleep(2000);
assertTrue(count.get() == size/2);
Thread.sleep(3000);
assertTrue(count.get() == size);
}
public void testExecutorWithCustomExecutor() throws InterruptedException {
int size = 20;
final int[] threadSize = {0};
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size);
dispatcher.attach(Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
threadSize[0]++;
Logger.d("[CustomDispatcher] create thread");
return new Thread(r);
}
}));
dispatcher.start();
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < size; i++) {
dispatcher.add(new TestTask(i, count));
}
Thread.sleep(2000);
assertTrue(count.get() == size);
assertTrue(threadSize[0] == size);
}
public void testExecutorWithCustomScheduler() throws InterruptedException {
int size = 20;
final int[] scheduleSize = {0};
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size);
HandlerThread handlerThread = new HandlerThread("worker-handler");
handlerThread.start();
Handler scheduler = new Handler(handlerThread.getLooper()) {
@Override
public void dispatchMessage(Message msg) {
scheduleSize[0]++;
super.handleMessage(msg);
super.dispatchMessage(msg);
}
};
dispatcher.attach(scheduler);
dispatcher.start();
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < size/2; i++) {
dispatcher.add(new TestTask(i, count));
}
for (int i = 0; i < size/2; i++) {
dispatcher.schedule(new TestTask(i, count), 3000);
}
Thread.sleep(2000);
assertTrue(count.get() == size/2);
assertTrue(scheduleSize[0] == 0);
Thread.sleep(3000);
assertTrue(count.get() == size);
assertTrue(scheduleSize[0] == size/2);
}
public void testThread() throws InterruptedException {
int size = 20;
ThreadDispatcher dispatcher = Dispatcher.Maker.newThreadDispatcher(size);
dispatcher.start();
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < size; i++) {
dispatcher.add(new TestTask(i, count));
}
Thread.sleep(2000);
assertTrue(count.get() == size);
}
public void testThreadSchedule() throws InterruptedException {
int size = 20;
ThreadDispatcher dispatcher = Dispatcher.Maker.newThreadDispatcher(size);
dispatcher.start();
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < size/2; i++) {
dispatcher.add(new TestTask(i, count));
}
for (int i = 0; i < size/2; i++) {
dispatcher.schedule(new TestTask(i, count), 3000);
}
Thread.sleep(2000);
assertTrue(count.get() == size/2);
Thread.sleep(5000);
assertTrue(count.get() == size);
}
private class TestTask implements Runnable, Comparable<TestTask> {
final int mId;
final AtomicInteger mCount;
boolean mFinished = false;
public TestTask(int id, AtomicInteger count) {
mId = id;
mCount = count;
}
@Override
public void run() {
Log.d(Dispatcher.TAG, "task started, id = " + mId);
try {
Thread.sleep(1000);
mCount.incrementAndGet();
Log.d(Dispatcher.TAG, "task finished, id = " + mId);
mFinished = true;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public int compareTo(TestTask another) {
return mId - another.mId;
}
}
}
/**
* Task dispatcher impl with ExecutorService.
* <p>
* Worker dispatcher with an {@link ExecutorService}. Note that if the executor's work
* queue is bounded, the excessive task will be added to the pending waiting for a working
* task to be finished.
* <p>
* Use {@link #attach(ExecutorService)} to set an existing executor, but if you do not use embed
* executor, you have to deal with the bounded issue.
* <p>
* Use {@link #attach(Handler)} to set an existing handler, which is used to schedule task
* with the executor.
*
* @author Kaede
* @version 2016-10-18
*/
public class ExecutorDispatcher implements Dispatcher, ThreadFactory, RejectedExecutionHandler {
private final int mCorePoolSize;
private final int mMaximumPoolSize;
private final long mKeepAliveTime;
private final PriorityBlockingQueue<Runnable> mWorkQueue;
private final LinkedBlockingQueue<Runnable> mPendingQueue;
private final AtomicInteger mCount = new AtomicInteger(1);
private Handler mScheduler;
private ExecutorService mExecutor;
public ExecutorDispatcher(int threadPoolSize) {
mCorePoolSize = threadPoolSize;
mMaximumPoolSize = threadPoolSize * 2;
// time to keep thread when it's idle. 30s
mKeepAliveTime = 30L;
mWorkQueue = new PriorityBlockingQueue<>();
mPendingQueue = new LinkedBlockingQueue<>();
}
public ExecutorDispatcher(int threadPoolSize, int capacity) {
mCorePoolSize = threadPoolSize;
mMaximumPoolSize = threadPoolSize * 2;
mKeepAliveTime = 30L;
mWorkQueue = new PriorityBlockingQueue<>(capacity);
mPendingQueue = new LinkedBlockingQueue<>();
}
@Override
public ExecutorDispatcher attach(Handler scheduler) {
if (mScheduler == null) {
mScheduler = scheduler;
} else {
if (BuildConfig.DEBUG) {
Log.w(TAG, "scheduler has been initialized once.");
}
}
return this;
}
public ExecutorDispatcher attach(ExecutorService executor) {
if (mExecutor == null) {
mExecutor = executor;
} else {
if (BuildConfig.DEBUG) {
Log.w(TAG, "executor has been initialized once.");
}
}
return this;
}
public ExecutorService getExecutor() {
return mExecutor;
}
@Override
public void start() {
if (mExecutor == null) {
mExecutor = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime,
TimeUnit.SECONDS, mWorkQueue, this, this);
((ThreadPoolExecutor) mExecutor).allowCoreThreadTimeOut(true);
} else {
if (BuildConfig.DEBUG) {
Log.w(TAG, "dispatcher has already started once.");
}
}
}
@Override
public boolean isRunning() {
return mExecutor != null && !mExecutor.isShutdown();
}
@Override
public void add(Runnable runnable) {
if (mExecutor == null) {
throw new IllegalStateException("pls call #start to initialize.");
}
mExecutor.execute(runnable);
}
@Override
public void schedule(final Runnable runnable, long millis) {
if (mExecutor == null) {
throw new IllegalStateException("pls call #start to initialize.");
}
if (mScheduler == null) {
if (BuildConfig.DEBUG) {
Log.d(TAG, "create thread-executor-scheduler");
}
HandlerThread thread = new HandlerThread("thread-executor-scheduler");
thread.start();
mScheduler = new Handler(thread.getLooper());
}
if (millis < 0) millis = 0;
mScheduler.postDelayed(new Runnable() {
@Override
public void run() {
if (BuildConfig.DEBUG) Log.d(TAG, "execute task");
mExecutor.execute(runnable);
}
}, millis);
}
@Override
public void finish() {
// Poll from pending queue when a task is finished.
if (BuildConfig.DEBUG) {
Log.d(TAG, "task finish, check pending queue");
}
if (mPendingQueue.size() > 0) {
Runnable poll = mPendingQueue.poll();
if (poll != null) {
add(poll);
}
}
}
@Override
public void shutdown() {
if (mExecutor != null) {
mExecutor.shutdown();
}
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ExecutorDispatcher #" + mCount.getAndIncrement());
if (BuildConfig.DEBUG) {
Log.d(TAG, "executor new thread : " + thread.getName());
}
return thread;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// Offer pending queue when the executor's working queue is bounded.
// Note that work queue (PriorityBlockingQueue is an unbounded queue) capacity is
// INTEGER_MAX, therefore it will not be easily bounded and this is only a protection.
if (BuildConfig.DEBUG) Log.d(TAG, "bounded work queue, pend task");
mPendingQueue.offer(r);
}
}
/**
* Task dispatcher impl with Thread.
* <p>
* Use {@link #attach(Handler)} to set an existing handler, which is used to schedule task
* with the executor.
* @author kaede
* @version date 16/10/19
*/
public class ThreadDispatcher implements Dispatcher {
private final int mDispatcherSize;
private final AtomicInteger mCount = new AtomicInteger(1);
private final PriorityBlockingQueue<Runnable> mWorkQueue;
private Handler mScheduler;
private DispatcherThread[] mDispatchers;
public ThreadDispatcher(int threadPoolSize) {
mDispatcherSize = threadPoolSize;
mWorkQueue = new PriorityBlockingQueue<>();
}
public ThreadDispatcher(int threadPoolSize, int capacity) {
mDispatcherSize = threadPoolSize;
mWorkQueue = new PriorityBlockingQueue<>(capacity);
}
@Override
public ThreadDispatcher attach(Handler scheduler) {
if (mScheduler == null) {
mScheduler = scheduler;
} else {
if (BuildConfig.DEBUG) {
Log.w(TAG, "scheduler has been initialized once.");
}
}
return this;
}
@Override
public void start() {
if (mDispatchers == null || mDispatchers.length == 0) {
mDispatchers = new DispatcherThread[mDispatcherSize];
for (int i = 0; i < mDispatchers.length; i++) {
mDispatchers[i] = new DispatcherThread();
mDispatchers[i].start();
}
} else {
if (BuildConfig.DEBUG) {
Log.w(TAG, "dispatcher has already started once.");
}
}
}
@Override
public boolean isRunning() {
return mDispatchers != null && mDispatchers.length > 0;
}
@Override
public void add(Runnable runnable) {
if (mDispatchers == null) {
throw new IllegalStateException("pls call #start to initialize.");
}
mWorkQueue.offer(runnable);
}
@Override
public void schedule(final Runnable runnable, long millis) {
if (mDispatchers == null) {
throw new IllegalStateException("pls call #start to initialize.");
}
if (mScheduler == null) {
if (BuildConfig.DEBUG) {
Log.d(TAG, "create thread-executor-scheduler");
}
HandlerThread thread = new HandlerThread("thread-executor-scheduler");
thread.start();
mScheduler = new Handler(thread.getLooper());
}
if (millis < 0) millis = 0;
mScheduler.postDelayed(new Runnable() {
@Override
public void run() {
if (BuildConfig.DEBUG) {
Log.d(TAG, "execute task");
}
mWorkQueue.offer(runnable);
}
}, millis);
}
@Override
public void finish() {
}
@Override
public void shutdown() {
if (mDispatchers != null) {
for (int i = 0; i < mDispatchers.length; i++) {
mDispatchers[i].quit();
mDispatchers[i] = null;
}
mDispatchers = null;
}
}
private class DispatcherThread extends Thread {
public DispatcherThread() {
setName("ThreadDispatcher #" + mCount.getAndIncrement());
if (BuildConfig.DEBUG) {
Log.d(TAG, "create ThreadDispatcher #" + mCount.getAndIncrement());
}
}
@Override
public void run() {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
while (true) {
try {
Runnable runnable = mWorkQueue.take();
runnable.run();
if (isInterrupted()) {
if (BuildConfig.DEBUG) {
Log.d(TAG, "Dispatcher is interrupted.");
}
break;
}
} catch (InterruptedException e) {
if (BuildConfig.DEBUG) {
Log.d(TAG, "BlockingQueue is interrupted.");
}
}
}
}
public void quit() {
interrupt();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment