Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ThreadPoolExecutor using an unbounded queue where minThreads can be less than maxThreads
import scala.concurrent.ExecutionContext
import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, ThreadFactory, TimeUnit}
import java.util.concurrent.{Executors, RejectedExecutionHandler, RejectedExecutionException}
object ScalingThreadPoolExecutor {
val defaultSecondsBeforeEviction = 60
def apply(minThreads: Int, maxThreads: Int, threadFactory: ThreadFactory): ScalingThreadPoolExecutor = {
val queue = new BlockingQueue
val rejectionHandler = new RejectionHandler
val sec = defaultSecondsBeforeEviction
val tpe = new ScalingThreadPoolExecutor(minThreads, maxThreads, sec, threadFactory, queue, rejectionHandler)
queue.executor = tpe
tpe
}
def apply(minThreads: Int, maxThreads: Int): ScalingThreadPoolExecutor = {
apply(minThreads, maxThreads, Executors.defaultThreadFactory)
}
class BlockingQueue extends LinkedBlockingQueue[Runnable] {
var executor: ScalingThreadPoolExecutor = _
/**
* Inserts the specified element at the tail of this queue if there is at
* least one available thread to run the current task. If all pool threads
* are actively busy, it rejects the offer. If the offer is rejected the
* ThreadPoolExecutor will try to allocate a new thread (up to the maximum).
* If a thread cannot be allocated the RejectionHandler will be called.
*/
override def offer(r: Runnable): Boolean = {
val allWorkingThreads = executor.getActiveCount + super.size
allWorkingThreads < executor.getPoolSize && super.offer(r)
}
}
class RejectionHandler extends RejectedExecutionHandler {
/**
* No threads are available to run this task. Queue it.
*/
def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
try executor.getQueue.put(r) catch {
case iex: InterruptedException => throw new RejectedExecutionException(iex)
}
}
}
}
import ScalingThreadPoolExecutor._
/**
* ThreadPoolExecutor using an unbounded queue where minThreads can be less than maxThreads.
*
* The ThreadPoolExecutor javadoc says:
* If there are more than corePoolSize but less than maximumPoolSize threads running,
* a new thread will be created only if the queue is full.
*
* If you use a unbounded queue then ThreadPoolExecutor will never allocate more than corePoolSize
* threads. This executor fixes that problem.
*
* Adapted from https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile
*
* As an alternative see https://github.com/boundary/overlock (ThreadPool.instrumentedElastic)
*/
class ScalingThreadPoolExecutor(minThreads: Int, maxThreads: Int, secBeforeEviction: Int, threadFactory: ThreadFactory,
blockingQueue: BlockingQueue, rejectionHandler: RejectionHandler)
extends ThreadPoolExecutor(minThreads, maxThreads, secBeforeEviction, TimeUnit.SECONDS,
blockingQueue, threadFactory, rejectionHandler) {
// Use AtomicInteger to avoid locking to get activeCount
val activeCount = new AtomicInteger
override def getActiveCount: Int = activeCount.get()
override protected def beforeExecute(t: Thread, r: Runnable) { activeCount.incrementAndGet() }
override protected def afterExecute(r: Runnable, t: Throwable) { activeCount.decrementAndGet() }
}
/**
* An ExecutionContext factory for blocking threads
*/
object BlockingTaskExecutionContext extends Logging {
val ids = new AtomicInteger
def apply(poolName: String, maxThreads: Int = 10): ExecutionContext = {
val threadGroup = Thread.currentThread.getThreadGroup
val uncaughtHandler = new UncaughtExceptionHandler {
def uncaughtException(t: Thread, e: Throwable): Unit = {
log.error(s"Exception in $poolName thread: ${e.getMessage}", e)
}
}
val threadFactory = new ThreadFactory {
def newThread(r: Runnable): Thread = {
val name = s"$poolName-${ids.incrementAndGet()}"
val t = new Thread(threadGroup, r, name)
t.setDaemon(true) // Prevent live threads from stopping System.exit(0)
t.setUncaughtExceptionHandler(uncaughtHandler)
t
}
}
val executor = ScalingThreadPoolExecutor(0, maxThreads, threadFactory)
ExecutionContext.fromExecutor(executor)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.