Skip to content

Instantly share code, notes, and snippets.

@cm-kazup0n
Last active June 11, 2020 10:17
Show Gist options
  • Save cm-kazup0n/6807bad84f2f99f530fd46ef0eece824 to your computer and use it in GitHub Desktop.
Save cm-kazup0n/6807bad84f2f99f530fd46ef0eece824 to your computer and use it in GitHub Desktop.
import java.util.concurrent.{Executors, TimeUnit}
import cats.effect._
import cats.implicits._
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
object AsyncExample extends App with StrictLogging {
val nProcess = 5
//実行すると「waitだけ待ってタスクを起動しそのFiberを返す」IO
def invokeLater[A](wait: FiniteDuration, n: String)(io: IO[A])(implicit cs: ContextShift[IO],
timer: Timer[IO]): IO[Fiber[IO, A]] =
for {
_ <- info(s"[process $n] wait for $wait")
_ <- timer.sleep(wait)
fiber <- io.start
} yield fiber
def info(msg: => String): IO[Unit] = IO(logger.info(msg))
def withThreadPool(size: Int, timeout: FiniteDuration): Resource[IO, ExecutionContext] =
Resource
.make(IO(Executors.newFixedThreadPool(size)))(es =>
IO {
es.shutdown()
es.awaitTermination(timeout.toMillis, TimeUnit.MILLISECONDS)
()
}).map(ExecutionContext.fromExecutorService)
withThreadPool(nProcess, 60.seconds)
.use { ec =>
IO {
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)
val fibers =
Range(0, nProcess)
.map(n =>
invokeLater(500.millis, n.toString) {
for {
_ <- info(s"[process $n] **** main task started")
_ <- timer.sleep(5.seconds) //時間がかかる処理
_ <- info(s"[process $n] **** main task done ****")
} yield ()
}).toList.sequence.unsafeRunSync()
fibers.map(_.join).sequence.unsafeRunSync()
()
}
}.unsafeRunSync()
}
@cm-kazup0n
Copy link
Author

2020-06-11 18:53:48,284 main [info] AsyncExample$ [process 0] wait for 500 milliseconds
2020-06-11 18:53:48,853 pool-1-thread-2 [info] AsyncExample$ [process 0] **** main task started
2020-06-11 18:53:48,860 pool-1-thread-1 [info] AsyncExample$ [process 1] wait for 500 milliseconds
2020-06-11 18:53:49,367 pool-1-thread-4 [info] AsyncExample$ [process 1] **** main task started
2020-06-11 18:53:49,367 pool-1-thread-3 [info] AsyncExample$ [process 2] wait for 500 milliseconds
2020-06-11 18:53:49,870 pool-1-thread-2 [info] AsyncExample$ [process 2] **** main task started
2020-06-11 18:53:49,870 pool-1-thread-5 [info] AsyncExample$ [process 3] wait for 500 milliseconds
2020-06-11 18:53:50,376 pool-1-thread-4 [info] AsyncExample$ [process 3] **** main task started
2020-06-11 18:53:50,376 pool-1-thread-1 [info] AsyncExample$ [process 4] wait for 500 milliseconds
2020-06-11 18:53:50,883 pool-1-thread-2 [info] AsyncExample$ [process 4] **** main task started
2020-06-11 18:53:53,863 pool-1-thread-5 [info] AsyncExample$ [process 0] **** main task done ****
2020-06-11 18:53:54,371 pool-1-thread-1 [info] AsyncExample$ [process 1] **** main task done ****
2020-06-11 18:53:54,876 pool-1-thread-4 [info] AsyncExample$ [process 2] **** main task done ****
2020-06-11 18:53:55,380 pool-1-thread-3 [info] AsyncExample$ [process 3] **** main task done ****
2020-06-11 18:53:55,885 pool-1-thread-2 [info] AsyncExample$ [process 4] **** main task done ****
true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment