Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
blockingを通るメソッドを大量に呼ぶとOOMになるので注意
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}
import scala.concurrent.{BlockContext, CanAwait, ExecutionContext}
class ThreadPrinter extends Runnable {
override def run(): Unit = {
println(Thread.currentThread())
println("isDaemon: " + Thread.currentThread().isDaemon)
println("isInstanceOfBlockingContext: " + Thread.currentThread().isInstanceOf[BlockContext])
println("")
}
}
class BlockingRunner extends Runnable {
override def run(): Unit = {
scala.concurrent.blocking {
println("id: " + Thread.currentThread().getId)
Thread.sleep(500)
}
}
}
class NonBlockingRunner extends Runnable {
override def run(): Unit = {
println("id: " + Thread.currentThread().getId)
Thread.sleep(100)
}
}
class DefaultThreadFactory extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
def newThread(runnable: Runnable): Thread = new Thread(runnable)
def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = new ForkJoinWorkerThread(fjp) with BlockContext {
override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
var result: T = null.asInstanceOf[T]
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
@volatile var isdone = false
override def block(): Boolean = {
result = try thunk finally { isdone = true }
true
}
override def isReleasable = isdone
})
result
}
}
}
object FutureJikken {
val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler {
def uncaughtException(thread: Thread, cause: Throwable): Unit = cause.printStackTrace()
}
val defaultEc: ExecutionContext = scala.concurrent.ExecutionContext.global
val forkJoinEc: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(50))
val forkJoinWithFactory: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(1000, new DefaultThreadFactory, uncaughtExceptionHandler, false))
def main(args: Array[String]): Unit = {
printThreadInfo(forkJoinWithFactory)
checkThreadSize(defaultEc, 10000)
Thread.sleep(10000)
}
def printThreadInfo(ec: ExecutionContext) = {
/*
* defaultEc
* Thread[ForkJoinPool-1-worker-5,5,main]
* isDaemon: true
* isInstanceOfBlockingContext: true
*/
/*
* forkJoinEc
* Thread[ForkJoinPool-1-worker-57,5,main]
* isDaemon: true
* isInstanceOfBlockingContext: false
*/
/*
* forkJoinWithFactory
* Thread[ForkJoinPool-2-worker-57,5,main]
* isDaemon: false
* isInstanceOfBlockingContext: false
*/
ec.execute(new ThreadPrinter)
}
def checkThreadSize(ec: ExecutionContext, n: Int) = {
(1 to n).foreach(_ => ec.execute(new BlockingRunner))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment