Last active
June 3, 2020 00:03
-
-
Save cm-kazup0n/81c94c816df33fe9970db0a7e185f323 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import java.util.concurrent.{Executors, TimeUnit} | |
import cats.data.ReaderT | |
import cats.effect.{Concurrent, ContextShift, IO} | |
import cats.implicits._ | |
import cats.kernel.Monoid | |
import cats.{Parallel, effect} | |
import example.APIClient.TotalResult | |
import simulacrum.typeclass | |
import scala.concurrent.ExecutionContext | |
final case class Result[A](payload: Seq[A], total: Long) | |
object Result { | |
implicit def monoid[A]: Monoid[Result[A]] = Monoid.instance(Result[A](Seq.empty, 0), { | |
case (Result(p1, t1), Result(p2, t2)) => Result(p1 ++ p2, t1 + t2) | |
}) | |
} | |
final case class Request[A](from: Long, size: Long) | |
@typeclass | |
sealed trait ResultGen[A] { | |
def make(req: Request[A], total: Long): Result[A] | |
} | |
object ResultGen { | |
def instance[A](f: => A): ResultGen[A] = new ResultGen[A] { | |
override def make(req: Request[A], total: Long): Result[A] = Result(Seq.fill(total.toInt)(f), total) | |
} | |
} | |
object APIClient { | |
type TotalResult = Long | |
def run[F[_] : Concurrent, A: ResultGen](req: Request[A]): ReaderT[F, TotalResult, Result[A]] = ReaderT { total => | |
Concurrent[F].delay { | |
Thread.sleep(50) | |
ResultGen[A].make(req, total) | |
} | |
} | |
} | |
final case class Bench(min: Long, max: Long, avg: Float, runs: Seq[Long]) | |
object Bench { | |
def apply[A](n: Long)(f: IO[A]): Unit = { | |
val r = 0L.to(n).foldLeft(Bench(0, 0, 0, Seq.empty)) { | |
case (stat, _) => { | |
val start = System.nanoTime() | |
f.unsafeRunSync() | |
val end = System.nanoTime() | |
val runs = stat.runs :+ (end - start) | |
Bench(runs.min, runs.max, runs.sum / runs.length, runs) | |
} | |
} | |
println(r) | |
} | |
def withContextShift[A](nThreads: Int, nTry:Long)(f: ContextShift[IO] => IO[A]) = { | |
val es = Executors.newFixedThreadPool(nThreads) | |
val s: effect.ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutorService(es)) | |
apply(nTry)(f(s)) | |
es.shutdownNow() | |
es.awaitTermination(1000, TimeUnit.MILLISECONDS) | |
} | |
} | |
object Main2 extends App { | |
implicit val stringResultGen: ResultGen[String] = ResultGen.instance("Result") | |
// 1スレッド | |
Bench.withContextShift(1, 5)(implicit s => run[IO, String](10).run(500L)) | |
Bench.withContextShift(1, 5)(implicit s => run[IO, String](10).run(500L)) | |
// 5スレッド | |
Bench.withContextShift(5, 5)(implicit s => run[IO, String](10).run(500L)) | |
Bench.withContextShift(5, 5)(implicit s => run[IO, String](10).run(500L)) | |
def run[F[_] : Concurrent : Parallel, A: ResultGen](size: Long): ReaderT[F, TotalResult, Result[A]] = for { | |
start <- APIClient.run(Request[A](0, size)) | |
totalPages = start.total / size | |
result <- ReaderT[F, TotalResult, Result[A]] { total => | |
2L.to(totalPages).toList | |
.parTraverse(page => APIClient.run(Request[A]((page - 1) * size, size)) | |
.run(total)) | |
.map(rs => Monoid[Result[A]].combineAll(rs)) | |
} | |
} yield result | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
threads = {1, 5, 10, 20}