Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created January 31, 2013 23:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pchiusano/4687597 to your computer and use it in GitHub Desktop.
Save pchiusano/4687597 to your computer and use it in GitHub Desktop.
Trampolined Future type.
package scalaz.concurrent
import Future._
trait Future[+A] {
def flatMap[B](f: A => Future[B]): Future[B] = this match {
case Now(a) => More(() => f(a))
case Later(listen) => BindLater(listen, f)
case More(force) => BindMore(force, f)
case BindLater(listen,g) => More(() =>
BindLater(listen)(x => g(x) flatMap f))
case BindMore(force,g) => More(() =>
BindMore(force)(x => g(x) flatMap f))
}
def map[B](f: A => B): Future[B] =
this flatMap (a => Now(f(a)))
final def runAsync(onFinish: A => Unit): Future[Unit] = this.start match {
case Now(a) => now(onFinish(a))
case Later(r) => now(r(onFinish))
case BindLater(r, f) =>
val latch = new java.util.concurrent.CountDownLatch(1)
@volatile var result: Option[Any] = None
r { x => result = Some(x); latch.countDown }
more { latch.await; f(result.get).runAsync(onFinish) }
case _ => sys.error("Impossible!")
}
@annotation.tailrec
final def start: Future[A] = this match {
case More(force) => force().start
case BindMore(force,f) => (force() flatMap f).start
case _ => this // Now, Later, BindLater
}
// synchronous evaluator
def run: A = this.start match {
case Now(a) => a
case f => {
val latch = new java.util.concurrent.CountDownLatch(1)
@volatile var result: Option[A] = None
f.runAsync { a => result = Some(a); latch.countDown }.run
latch.await
result.get
}
}
}
object Future {
// define constructors inside companion object
case class Now[+A](get: A) extends Future[A]
case class More[+A](force: () => Future[A]) extends Future[A]
case class BindMore[A,+B](force: () => Future[A],
f: A => Future[B]) extends Future[B]
case class Later[+A](listen: (A => Unit) => Unit) extends Future[A]
case class BindLater[A,+B](listen: (A => Unit) => Unit,
f: A => Future[B]) extends Future[B]
// curried versions for type inference
def BindLater[A,B](listen: (A => Unit) => Unit)(
f: A => Future[B]): Future[B] =
BindLater(listen, f)
def BindMore[A,B](force: () => Future[A])(
f: A => Future[B]): Future[B] =
BindMore(force, f)
// hide java.util.concurrent.Future, import everything else
import java.util.concurrent.{Future => _, _}
def more[A](a: => Future[A]): Future[A] = More(() => a)
def delay[A](a: => A): Future[A] = More(() => Now(a))
def now[A](a: A): Future[A] = Now(a)
def fork[A](a: => Future[A]): Future[A] = apply(a) flatMap (a => a)
// Create a Later from a nonstrict value,
// backed by a shared thread pool (declared below)
def apply[A](a: => A): Future[A] = {
@volatile var result: Option[A] = None
val latch = new java.util.concurrent.CountDownLatch(1)
val task = pool.submit { new Callable[Unit] {
def call = { result = Some(a); latch.countDown }
}}
more { Later { cb => latch.await; cb(result.get) } }
}
// Daemon threads will not prevent the JVM from exiting, if they are
// the only threads left running (see java.lang.Thread API docs for
// details)
val daemonize = new ThreadFactory { def newThread(r: Runnable) = {
val t = new Thread(r)
t.setDaemon(true)
t
}}
val pool = Executors.newCachedThreadPool(daemonize)
}
object FutureTest extends App {
val N = 100000
def worstCaseScenario1 =
(0 to N).map(i => Future(i).start).foldLeft(Future(0)) {
(f1,f2) => for {
acc <- f1
i <- f2
} yield (acc + i)
}
def worstCaseScenario2 =
(0 to N).map(i => delay(i)).foldLeft(delay(0)) {
(f1,f2) => for {
acc <- f1
i <- f2
} yield (acc + i)
}
def worstCaseScenario2a =
(0 to N).map(i => delay(i)).foldLeft(now(0)) {
(f1,f2) => for {
acc <- f1
i <- f2
} yield (acc + i)
}
def worstCaseScenario3 =
(0 to N).map(i => Future(i).start).foldLeft(delay(0)) {
(f1,f2) => for {
acc <- f1
i <- f2
} yield (acc + i)
}
def runRepeatedly(n: Int): Future[Unit] =
if (n > 0) Future { () } flatMap (_ => runRepeatedly(n-1))
else now(())
println(worstCaseScenario1.run)
println(worstCaseScenario2.run)
println(worstCaseScenario2a.run)
println(worstCaseScenario3.run)
runRepeatedly(10000).run
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment