Skip to content

Instantly share code, notes, and snippets.

@SystemFw
Last active July 9, 2018 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save SystemFw/8b11eee997843f351ef7e53e1a350648 to your computer and use it in GitHub Desktop.
Save SystemFw/8b11eee997843f351ef7e53e1a350648 to your computer and use it in GitHub Desktop.
Parallel foldMonoid with fs2
object pfm {
import cats.implicits._
import cats.kernel.CommutativeMonoid
import cats.effect._
import fs2._
import scala.concurrent.ExecutionContext
// adapted from https://gist.github.com/djspiewak/3ff2fa10fb6aa6d59b02
def parallelFoldMonoid[F[_]: Effect, A: CommutativeMonoid](n: Int)(
input: Stream[F, A])(implicit ec: ExecutionContext): Stream[F, A] =
Stream.eval(async.boundedQueue[F, Option[A]](n * n)) flatMap { q => // n * n or whatever limit you want
val workers = Stream.range(0, n) map { _ =>
q.dequeue.unNoneTerminate.foldMonoid
.onComplete(Stream.eval_(q.enqueue1(None)))
}
val driver = input.noneTerminate to q.enqueue
workers.join(n).foldMonoid concurrently driver
}
}
object Test {
import cats.implicits._
import cats.kernel.CommutativeMonoid
import cats.effect._
import fs2._
import scala.concurrent.ExecutionContext
// same as parallelFoldMonoid, but with some logging to show the parallelism
def loggedParallelFoldMonoid[F[_], A: CommutativeMonoid](n: Int)(
input: Stream[F, A])(implicit F: Effect[F], ec: ExecutionContext) =
Stream.eval(async.boundedQueue[F, Option[A]](n * n)) flatMap { q => // n * n or whatever limit you want
val workers = Stream.range(0, n) map { i =>
q.dequeue.unNoneTerminate
.evalMap(x => F.delay{println(s"$i has dequeued $x using Thread ${Thread.currentThread.getName}"); x})
.foldMonoid
.evalMap(x => F.delay {println(s"$i has computed $x using Thread ${Thread.currentThread.getName}"); x})
.onComplete(Stream.eval_(q.enqueue1(None)))
}
val driver = input.noneTerminate to q.enqueue
workers.join(n).foldMonoid concurrently driver
}
import scala.concurrent.ExecutionContext.Implicits.global
def stream = loggedParallelFoldMonoid(3) {
Stream.range(1, 20).covary[IO]
}
def res = stream.runLog.unsafeRunSync
// scala > Test.res
// 0 has dequeued 1 using Thread ForkJoinPool-1-worker-1
// 2 has dequeued 3 using Thread ForkJoinPool-1-worker-1
// 0 has dequeued 4 using Thread ForkJoinPool-1-worker-13
// 2 has dequeued 5 using Thread ForkJoinPool-1-worker-15
// 0 has dequeued 6 using Thread ForkJoinPool-1-worker-15
// 2 has dequeued 7 using Thread ForkJoinPool-1-worker-5
// 0 has dequeued 8 using Thread ForkJoinPool-1-worker-13
// 2 has dequeued 9 using Thread ForkJoinPool-1-worker-13
// 0 has dequeued 10 using Thread ForkJoinPool-1-worker-13
// 1 has dequeued 2 using Thread ForkJoinPool-1-worker-13
// 2 has dequeued 11 using Thread ForkJoinPool-1-worker-13
// 0 has dequeued 12 using Thread ForkJoinPool-1-worker-5
// 1 has dequeued 13 using Thread ForkJoinPool-1-worker-1
// 2 has dequeued 14 using Thread ForkJoinPool-1-worker-11
// 0 has dequeued 15 using Thread ForkJoinPool-1-worker-7
// 1 has dequeued 16 using Thread ForkJoinPool-1-worker-1
// 2 has dequeued 17 using Thread ForkJoinPool-1-worker-11
// 0 has dequeued 18 using Thread ForkJoinPool-1-worker-15
// 1 has dequeued 19 using Thread ForkJoinPool-1-worker-13
// 2 has computed 66 using Thread ForkJoinPool-1-worker-1
// 0 has computed 74 using Thread ForkJoinPool-1-worker-7
// 1 has computed 50 using Thread ForkJoinPool-1-worker-9
// res0: Vector[Int] = Vector(190)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment