Skip to content

Instantly share code, notes, and snippets.

@eymar
Last active February 27, 2018 14:10
Show Gist options
  • Save eymar/de9cd036c718404c37f846cbb165adca to your computer and use it in GitHub Desktop.
Save eymar/de9cd036c718404c37f846cbb165adca to your computer and use it in GitHub Desktop.
layout title author
post
j.u.c ExecutorService gotcha

java.util.concurrent ExecutorService allows for a simple way of using a thread pool within a java application. I have seen the following happens in more than one place (including some quite known open source projects) that I thought it make sense to blog about it.

One of the most common scenarios of using a thread pool is creating an unbounded thread pool with minimum and maximum number of threads. With the executor service, you can create the following quite simply:

A fixed size thread pool with unbounded queue. Problematic in our case since we want to set the minimum and maximum thread pool size.

// similar to j.u.c.Executors.newFixedThreadPool(int nThreads)
new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue())

A cached thread pool that creates threads as needed and reuses threads when possible (problematic with bounded maximum threads so can’t be used in our case):

// similar to j.u.c.Executors.newCachedThreadPool()
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());

The problem starts when one needs to create an unbounded thread pool with minimum and maximum threads. What most people do is the following:

// similar to j.u.c.Executors.newCachedThreadPool()
new ThreadPoolExecutor(coreThreads, maximumThreads,
                                      60L, TimeUnit.SECONDS,
                                      new LinkedBlockingQueue());

This construction of thread pool will simply not work as expected. This is due to the logic within the ThreadPoolExecutor where new threads are added if there is a failure to offer a task to the queue. In our case, we use an unbounded LinkedBlockingQueue, where we can always offer a task to the queue. It effectively means that we will never grow above the core pool size and up to the maximum pool size.

OK, so this does not work, lets try and build one that works. The first thing that we want to do is create a blocking queue that is aware of the ThreadPoolExecutor:

public class ScalingQueue extends LinkedBlockingQueue {
/**

	The executor this Queue belongs to
     */
    private ThreadPoolExecutor executor;

/**

	Creates a TaskQueue with a capacity of
	{@link Integer#MAX_VALUE}.
     */
    public ScalingQueue() {
        super();
    }

/**

	Creates a TaskQueue with the given (fixed) capacity.
     *
	@param capacity the capacity of this queue.
     */
    public ScalingQueue(int capacity) {
        super(capacity);
    }

/**

	Sets the executor this queue belongs to.
     */
    public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
        this.executor = executor;
    }

/**

	Inserts the specified element at the tail of this queue if there is at
	least one available thread to run the current task. If all pool threads
	are actively busy, it rejects the offer.
     *
	@param o the element to add.
	@return true if it was possible to add the element to this
	queue, else false
	@see ThreadPoolExecutor#execute(Runnable)
     */
    @Override
    public boolean offer(E o) {
        int allWorkingThreads = executor.getActiveCount() + super.size();
        return allWorkingThreads < executor.getPoolSize() && super.offer(o);
    }
}

As you can see, we are going to reject the addition of a new task if there are no threads to handle it. This will cause the thread pool executor to try and allocate a new thread (up to the maximum threads). If there are no threads, the task will be rejected. In our case, if the task is rejected, we would like to put it back to the queue. This is a simple thing to do with ThreadPoolExecutor since we can implement our own RejectedExecutionHandler:

public class ForceQueuePolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put®;
        } catch (InterruptedException e) {
            //should never happen since we never wait
            throw new RejectedExecutionException(e);
        }
    }
}

Last, as a way to increase performance of additions of tasks to the queue, we can enhance the built in getActiveCount method in ThreadPoolExecutor to be faster (by default it obtains a lock and runs on the current workers):

public class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
/**

	number of threads that are actively executing tasks
     */
    private final AtomicInteger activeCount = new AtomicInteger();

public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public int getActiveCount() {
return activeCount.get();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
activeCount.incrementAndGet();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
activeCount.decrementAndGet();
}
}

That is it. Now, we can construct our thread pool (a simple factory method is provided here):

public static ExecutorService newScalingThreadPool(int min, int max,
                                                   long keepAliveTime) {
    ScalingQueue queue = new ScalingQueue();
    ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue);
    executor.setRejectedExecutionHandler(new ForceQueuePolicy());
    queue.setThreadPoolExecutor(executor);
    return executor;
}

Comments are welcomed, Enjoy!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment