Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Last active January 20, 2019 07:52
Show Gist options
  • Save pchlupacek/f53f63711d9a71a7b3aba19187503ab1 to your computer and use it in GitHub Desktop.
Save pchlupacek/f53f63711d9a71a7b3aba19187503ab1 to your computer and use it in GitHub Desktop.
fs2 par join - comparison
/* related to case https://github.com/functional-streams-for-scala/fs2/issues/1397 **/
package fs2.concurrent
import java.util.concurrent.Executors
import cats.Traverse
import cats.implicits._
import cats.effect.{ExitCode, IO, IOApp}
import scala.annotation.tailrec
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
object Application extends IOApp {
private def isPrime(n: Long): (Long, Boolean) = n match {
case 1 => (1, true)
case n =>
val sqn = Math.sqrt(n)
var i = 2
while (i <= sqn) {
if (n % i == 0) return (n, false)
i += 1
}
(n, true)
}
val concurrency = Runtime.getRuntime.availableProcessors().max(4)
implicit val ec =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(concurrency))
@tailrec
private def isMonotonicallyIncreasing(ns: Vector[(Long, Boolean)]): Boolean =
if (ns.size <= 1) true
else {
if (ns(0)._1 <= ns(1)._1) isMonotonicallyIncreasing(ns.tail)
else false
}
def concurrent7_parallel_future() = {
val data = (9138000000L to 9139000000L).toVector
val computation = Future.sequence(data.map(x => Future(isPrime(x))(ec)))
val result = Await.result(computation, Duration.Inf)
result
}
def concurrent8_parallel_execution_io() = {
val data = (9138000000L to 9139000000L).toVector
val computation = for {
fibers <- Traverse[Vector].sequence(data.map(x => IO(isPrime(x)).start))
results <- Traverse[Vector].sequence(fibers.map(_.join))
} yield results
computation.unsafeRunSync()
}
def concurrent9_parallel_execution_fs2: Vector[(Long, Boolean)] = {
import fs2.Stream
val offset: Long = 9138000000L
val data: Stream[IO, Int] =
Stream.range(0, 1000000)
implicit val concurrent = IO.ioConcurrentEffect(IO.contextShift(ec))
val computations: Stream[IO, (Long, Boolean)] =
data.balanceThrough(Int.MaxValue, concurrency) { balanced =>
balanced.map { idx =>
isPrime(idx + offset)
}
}
computations.compile.toVector.unsafeRunSync()
}
def time(name: String)(action: => Vector[(Long, Boolean)]): Unit = {
println(s"starting $name")
val start = System.nanoTime()
val results = action
val end = System.nanoTime()
val delta = end - start
println(s"$name took ${delta.nanos.toMillis} millis")
println(s"$name results: ${results.size}")
println(s"$name isMonotonic: ${isMonotonicallyIncreasing(results)}")
}
override def run(args: List[String]): IO[ExitCode] =
IO {
time("SFututre")(concurrent7_parallel_future)
time("IO")(concurrent8_parallel_execution_io)
time("FS2")(concurrent9_parallel_execution_fs2)
ec.shutdown()
ExitCode.Success
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment