Skip to content

Instantly share code, notes, and snippets.

@gabrieljones
Created January 25, 2022 14:56
Show Gist options
  • Save gabrieljones/db7db213a37033101be104b95551821f to your computer and use it in GitHub Desktop.
Save gabrieljones/db7db213a37033101be104b95551821f to your computer and use it in GitHub Desktop.
import java.util.{Timer, TimerTask}
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
import scala.concurrent.duration.{Duration, DurationInt, DurationLong}
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future, Promise, blocking}
import scala.util.chaining.scalaUtilChainingOps
object ForkJoinPoolSandbox extends App {
val timer = new Timer()
val atomicRef = new AtomicLong(0)
def delay(d: Duration): Future[Unit] = {
val p = Promise[Unit]()
timer.schedule(new TimerTask {
override def run(): Unit = p.success(())
}, d.toMillis)
p.future
}
val pool = new ForkJoinPool(16)//*16)
implicit val executionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)
val start = System.nanoTime()
timer.schedule(new TimerTask {
override def run(): Unit = println(s"$pool ${atomicRef.get()}")//println(s"${pool.getPoolSize} ${pool.getQueuedTaskCount} ${pool.getRunningThreadCount} ${pool.getActiveThreadCount}")
}, 0, 1000)
val futures = for {
i <- 0 until 1024 * 2 * 2 * 2 * 2
} yield {
// delay(1.second).map(_ => i)
// Future{blocking{Thread.sleep(1000)};i}
// Future{Thread.sleep(1000);i}
// (0 until 1024).map(j=>delay(1.second).map(_ => (i, j).tap(x=>println(s"${System.nanoTime() - start} $x")))).pipe(fs => Future.sequence(fs))
/*
(0 until 1024).map(j=>delay(1.second).map(_ => (i, j)
// .tap(x=>println(s"${(System.nanoTime() - start).nanos.toSeconds} $x"))
))
.pipe(fs => Future.sequence(fs))
*/
// (0 until 16).map(j=>Future{blocking{Thread.sleep(1000)};println((i,j));(i, j)}).pipe(fs => Future.sequence(fs))
/*
(0 until 1024 * 2)
.map(j => Future.apply(j))
.map(_.map((i+Int.MaxValue/2) * 1024 + _).map(n => n -> ! ((2 until n) exists (n % _ == 0))))
.pipe(fs => Future.sequence(fs))
*/
(0 until 1024 * 1).map(j => Future{atomicRef.incrementAndGet()}).pipe(fs => Future.sequence(fs))
}
val future = Future.sequence(futures)
Await.result(future, Duration.Inf)
println("Futures Complete")
}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should
import java.util.concurrent.{ForkJoinPool, TimeUnit}
class ForkJoinPoolTest extends AnyFunSuite with should.Matchers {
val parallelism = 16
test("simpl vs full") {
val simple = new ForkJoinPool(parallelism)
val full = new ForkJoinPool(
parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
false,
parallelism,
parallelism * 2,
1,
null,
60000,
TimeUnit.MILLISECONDS
)
val fns: Seq[ForkJoinPool => Any] = Seq(
_.getPoolSize,
_.getParallelism,
_.getAsyncMode,
_.getFactory,
_.getUncaughtExceptionHandler,
)
for {
fn <- fns
} {
fn(simple) should === (fn(full))
}
println(simple)
println(full)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment