Skip to content

Instantly share code, notes, and snippets.

@mnadeem
Last active October 27, 2021 21:36
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mnadeem/5d01282ea4f86201ea407065e9d53cf3 to your computer and use it in GitHub Desktop.
Save mnadeem/5d01282ea4f86201ea407065e9d53cf3 to your computer and use it in GitHub Desktop.
Scalable Thread Pool Executor (TPE) Which first creates threads up to max pool size and then queue up the tasks (Queue Does not depends upon Executor)
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
public final class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit keepAliveUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, keepAliveUnit, new DynamicBlockingQueue<Runnable>(new LinkedTransferQueue<Runnable>()), new ForceQueuePolicy());
}
public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit keepAliveUnit, TransferQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, keepAliveUnit, new DynamicBlockingQueue<Runnable>(workQueue), new ForceQueuePolicy());
}
@Override
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
throw new IllegalArgumentException("Cant set rejection handler");
}
private static class ForceQueuePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
//Rejected work add to Queue.
executor.getQueue().put(r);
} catch (InterruptedException e) {
//should never happen since we never wait
throw new RejectedExecutionException(e);
}
}
}
private static class DynamicBlockingQueue<E> implements TransferQueue<E> {
private final TransferQueue<E> delegate;
public DynamicBlockingQueue(final TransferQueue<E> delegate) {
this.delegate = delegate;
}
@Override
public boolean offer(E o) {
return tryTransfer(o);
}
@Override
public boolean add(E o) {
if (this.delegate.add(o)) {
return true;
} else {// Not possible in our case
throw new IllegalStateException("Queue full");
}
}
@Override
public E remove() {
return this.delegate.remove();
}
@Override
public E poll() {
return this.delegate.poll();
}
@Override
public E element() {
return this.delegate.element();
}
@Override
public E peek() {
return this.delegate.peek();
}
@Override
public int size() {
return this.delegate.size();
}
@Override
public boolean isEmpty() {
return this.delegate.isEmpty();
}
@Override
public Iterator<E> iterator() {
return this.delegate.iterator();
}
@Override
public Object[] toArray() {
return this.delegate.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return this.delegate.toArray(a);
}
@Override
public boolean containsAll(Collection<?> c) {
return this.delegate.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
return this.delegate.addAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
return this.delegate.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return this.delegate.retainAll(c);
}
@Override
public void clear() {
this.delegate.clear();
}
@Override
public void put(E e) throws InterruptedException {
this.delegate.put(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return this.delegate.offer(e, timeout, unit);
}
@Override
public E take() throws InterruptedException {
return this.delegate.take();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return this.delegate.poll(timeout, unit);
}
@Override
public int remainingCapacity() {
return this.delegate.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return this.delegate.remove(o);
}
@Override
public boolean contains(Object o) {
return this.delegate.contains(o);
}
@Override
public int drainTo(Collection<? super E> c) {
return this.delegate.drainTo(c);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return this.delegate.drainTo(c, maxElements);
}
@Override
public boolean tryTransfer(E e) {
return this.delegate.tryTransfer(e);
}
@Override
public void transfer(E e) throws InterruptedException {
this.delegate.transfer(e);
}
@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return this.delegate.tryTransfer(e, timeout, unit);
}
@Override
public boolean hasWaitingConsumer() {
return this.delegate.hasWaitingConsumer();
}
@Override
public int getWaitingConsumerCount() {
return this.delegate.getWaitingConsumerCount();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment