Skip to content

Instantly share code, notes, and snippets.

@lancegatlin
Created August 28, 2019 15:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lancegatlin/288b55625a58032cee5a6f12b196ed95 to your computer and use it in GitHub Desktop.
Save lancegatlin/288b55625a58032cee5a6f12b196ed95 to your computer and use it in GitHub Desktop.
/**
* Create an ExecutionContext for blocking operations that grows with demand
*
* @param threadNamePrefix the prefix to put at the beginning of the name of threads for this pool
* @param unhandledExceptionLogger a function to log unhandled exceptions
* @param minPoolSize the minimum number of threads to allocate for this pool
* @param maxPoolSize the maximum allowed number of threads to allocate for this pool (requests beyond
* this maximum will block the thread of the requester)
* @param keepAliveTime the amount of time to keep an unused thread alive before destorying it
* @return an ExecutionContext for blocking operations
*/
def mkBlockingExecutionContext(
threadNamePrefix: String,
unhandledExceptionLogger: Throwable => Unit,
minPoolSize: Int = 0,
maxPoolSize: Int = 46034, // 75% of 61379 (max ubuntu threads) `cat /proc/sys/kernel/threads-max`
keepAliveTime: Duration = 60.seconds
) : ExecutionContext = {
val threadFactory =
new ThreadFactoryBuilder()
.setNameFormat(s"$threadNamePrefix-blocking-%d")
.build()
val threadPoolExecutor =
new ThreadPoolExecutor(
/* corePoolSize */ minPoolSize,
/* maximumPoolSize */ maxPoolSize,
/* keepAliveTime */ keepAliveTime.toMillis, TimeUnit.MILLISECONDS,
/* workQueue */ new SynchronousQueue[Runnable](),
threadFactory
)
ExecutionContext.fromExecutor(threadPoolExecutor,unhandledExceptionLogger)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment