Created
April 8, 2013 08:18
-
-
Save coderplay/5335125 to your computer and use it in GitHub Desktop.
Using PriorityBlockingQueue in thread pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.Callable; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.FutureTask; | |
import java.util.concurrent.PriorityBlockingQueue; | |
import java.util.concurrent.RejectedExecutionHandler; | |
import java.util.concurrent.RunnableFuture; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
public class PBQvsThreadPool { | |
static class ComparableFutureTask<V> extends FutureTask<V> implements | |
Runnable, Comparable<ComparableFutureTask<V>> { | |
private int priority; | |
public ComparableFutureTask(Callable<V> callable, int priority) { | |
super(callable); | |
this.priority = priority; | |
} | |
public ComparableFutureTask(Runnable runnable, V result, int priority) { | |
super(runnable, result); | |
this.priority = priority; | |
} | |
@Override | |
public int compareTo(ComparableFutureTask<V> o) { | |
return this.priority - o.priority; | |
} | |
} | |
static class PBQThreadPoolExecutor extends ThreadPoolExecutor { | |
public PBQThreadPoolExecutor(int corePoolSize, | |
int maximumPoolSize, | |
long keepAliveTime, | |
TimeUnit unit, | |
PriorityBlockingQueue<Runnable> workQueue) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); | |
} | |
public PBQThreadPoolExecutor(int corePoolSize, | |
int maximumPoolSize, | |
long keepAliveTime, | |
TimeUnit unit, | |
PriorityBlockingQueue<Runnable> workQueue, | |
ThreadFactory threadFactory) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); | |
} | |
public PBQThreadPoolExecutor(int corePoolSize, | |
int maximumPoolSize, | |
long keepAliveTime, | |
TimeUnit unit, | |
PriorityBlockingQueue<Runnable> workQueue, | |
RejectedExecutionHandler handler) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); | |
} | |
public <T> Future<T> submit(Runnable task, T result, int priority) { | |
if (task == null) | |
throw new NullPointerException(); | |
RunnableFuture<T> ftask = newTaskFor(task, result, priority); | |
execute(ftask); | |
return ftask; | |
} | |
public <T> Future<T> submit(Callable<T> task, int priority) { | |
if (task == null) | |
throw new NullPointerException(); | |
RunnableFuture<T> ftask = newTaskFor(task, priority); | |
execute(ftask); | |
return ftask; | |
} | |
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value, | |
int priority) { | |
return new ComparableFutureTask<T>(runnable, value, priority); | |
} | |
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable, | |
int priority) { | |
return new ComparableFutureTask<T>(callable, priority); | |
} | |
} | |
public static void main() { | |
PriorityBlockingQueue<Runnable> pbq = | |
new PriorityBlockingQueue<Runnable>(1024); | |
PBQThreadPoolExecutor pool = | |
new PBQThreadPoolExecutor(1024, 2048, 1000L, TimeUnit.MILLISECONDS, pbq); | |
final int priority = 1; | |
pool.submit(new Callable<Long>() { | |
@Override | |
public Long call() throws Exception { | |
return 1L; | |
} | |
}, priority); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment