Skip to content

Instantly share code, notes, and snippets.

@orium
Last active November 14, 2018 21:55
Show Gist options
  • Save orium/2f4d342cb8f2c8fa6a4b3a7ce8443282 to your computer and use it in GitHub Desktop.
Save orium/2f4d342cb8f2c8fa6a4b3a7ce8443282 to your computer and use it in GitHub Desktop.
Par actor - red book - 20181002
package ch07
import java.util.UUID
import java.util.concurrent.{Callable, CountDownLatch, ExecutorService, Executors}
import java.util.concurrent.atomic.AtomicReference
import ch07.Par.Par
import ch07.ParImpl.Par
object ParActor {
sealed trait Future[+A] {
private[ch07] def onComplete(cb: A => Unit): Unit
}
object Future {
private[ch07] def onComplete[A](f: (A => Unit) => Unit): Future[A] = new Future[A] {
override private[ch07] def onComplete(cb: A => Unit): Unit =
f(cb)
}
}
type Par[+A] = ExecutorService => Future[A]
object Par {
def unit[A](v: A): Par[A] =
es => Future.onComplete[A](cb => cb(v))
def fork[A](par: => Par[A]): Par[A] =
es => Future.onComplete[A] { cb =>
eval(es) {
par(es).onComplete(cb)
}
}
// Runs something in another thread.
private def eval(es: ExecutorService)(r: => Unit): Unit =
es.submit(new Callable[Unit] {
def call(): Unit = r
})
def run[A](es: ExecutorService)(p: Par[A]): A = {
val ref = new AtomicReference[A]
val latch = new CountDownLatch(1)
p(es).onComplete { r =>
ref.set(r)
latch.countDown()
}
latch.await()
ref.get()
}
def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = es => Future.onComplete[C] { cb =>
// val mapId = UUID.randomUUID()
val ar: AtomicReference[Option[A]] = new AtomicReference(None)
val br: AtomicReference[Option[B]] = new AtomicReference(None)
val combiner = Actor[Either[A, B]](es) { msg =>
// println(s"in map2 $mapId with thread ${Thread.currentThread()}")
msg match {
case Left(a) => ar.set(Some(a))
case Right(b) => br.set(Some(b))
}
(ar.get(), br.get()) match {
case (Some(av), Some(bv)) =>
eval(es) {
cb(f(av, bv))
}
case _ => ()
}
}
pa(es).onComplete(v => combiner ! Left(v))
pb(es).onComplete(v => combiner ! Right(v))
}
}
}
object Main extends App {
import ParActor.Par
def `give me a number n of threads and I shall give you a Par that deadlocks with an executor service with n thread but does not in an executor service with n+1 threads!`(nThreads: Int): Par[Int] =
nThreads match {
case 0 => Par.fork(Par.unit(0))
case n => Par.fork(`give me a number n of threads and I shall give you a Par that deadlocks with an executor service with n thread but does not in an executor service with n+1 threads!`(n - 1))
}
val par = `give me a number n of threads and I shall give you a Par that deadlocks with an executor service with n thread but does not in an executor service with n+1 threads!`(3)
val es3 = Executors.newFixedThreadPool(3)
println(Par.run(es3)(par))
def sum(ints: IndexedSeq[Int]): Par[Int] =
if (ints.length <= 1)
Par.unit(ints.headOption.getOrElse(0))
else {
val (l, r) = ints.splitAt(ints.length / 2)
Par.map2(sum(l), sum(r))(_ + _)
}
val summedNumbers = Par.run(es3)(sum(IndexedSeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
println(summedNumbers)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment