Created
January 31, 2013 23:11
-
-
Save pchiusano/4687597 to your computer and use it in GitHub Desktop.
Trampolined Future type.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.concurrent | |
import Future._ | |
trait Future[+A] { | |
def flatMap[B](f: A => Future[B]): Future[B] = this match { | |
case Now(a) => More(() => f(a)) | |
case Later(listen) => BindLater(listen, f) | |
case More(force) => BindMore(force, f) | |
case BindLater(listen,g) => More(() => | |
BindLater(listen)(x => g(x) flatMap f)) | |
case BindMore(force,g) => More(() => | |
BindMore(force)(x => g(x) flatMap f)) | |
} | |
def map[B](f: A => B): Future[B] = | |
this flatMap (a => Now(f(a))) | |
final def runAsync(onFinish: A => Unit): Future[Unit] = this.start match { | |
case Now(a) => now(onFinish(a)) | |
case Later(r) => now(r(onFinish)) | |
case BindLater(r, f) => | |
val latch = new java.util.concurrent.CountDownLatch(1) | |
@volatile var result: Option[Any] = None | |
r { x => result = Some(x); latch.countDown } | |
more { latch.await; f(result.get).runAsync(onFinish) } | |
case _ => sys.error("Impossible!") | |
} | |
@annotation.tailrec | |
final def start: Future[A] = this match { | |
case More(force) => force().start | |
case BindMore(force,f) => (force() flatMap f).start | |
case _ => this // Now, Later, BindLater | |
} | |
// synchronous evaluator | |
def run: A = this.start match { | |
case Now(a) => a | |
case f => { | |
val latch = new java.util.concurrent.CountDownLatch(1) | |
@volatile var result: Option[A] = None | |
f.runAsync { a => result = Some(a); latch.countDown }.run | |
latch.await | |
result.get | |
} | |
} | |
} | |
object Future { | |
// define constructors inside companion object | |
case class Now[+A](get: A) extends Future[A] | |
case class More[+A](force: () => Future[A]) extends Future[A] | |
case class BindMore[A,+B](force: () => Future[A], | |
f: A => Future[B]) extends Future[B] | |
case class Later[+A](listen: (A => Unit) => Unit) extends Future[A] | |
case class BindLater[A,+B](listen: (A => Unit) => Unit, | |
f: A => Future[B]) extends Future[B] | |
// curried versions for type inference | |
def BindLater[A,B](listen: (A => Unit) => Unit)( | |
f: A => Future[B]): Future[B] = | |
BindLater(listen, f) | |
def BindMore[A,B](force: () => Future[A])( | |
f: A => Future[B]): Future[B] = | |
BindMore(force, f) | |
// hide java.util.concurrent.Future, import everything else | |
import java.util.concurrent.{Future => _, _} | |
def more[A](a: => Future[A]): Future[A] = More(() => a) | |
def delay[A](a: => A): Future[A] = More(() => Now(a)) | |
def now[A](a: A): Future[A] = Now(a) | |
def fork[A](a: => Future[A]): Future[A] = apply(a) flatMap (a => a) | |
// Create a Later from a nonstrict value, | |
// backed by a shared thread pool (declared below) | |
def apply[A](a: => A): Future[A] = { | |
@volatile var result: Option[A] = None | |
val latch = new java.util.concurrent.CountDownLatch(1) | |
val task = pool.submit { new Callable[Unit] { | |
def call = { result = Some(a); latch.countDown } | |
}} | |
more { Later { cb => latch.await; cb(result.get) } } | |
} | |
// Daemon threads will not prevent the JVM from exiting, if they are | |
// the only threads left running (see java.lang.Thread API docs for | |
// details) | |
val daemonize = new ThreadFactory { def newThread(r: Runnable) = { | |
val t = new Thread(r) | |
t.setDaemon(true) | |
t | |
}} | |
val pool = Executors.newCachedThreadPool(daemonize) | |
} | |
object FutureTest extends App { | |
val N = 100000 | |
def worstCaseScenario1 = | |
(0 to N).map(i => Future(i).start).foldLeft(Future(0)) { | |
(f1,f2) => for { | |
acc <- f1 | |
i <- f2 | |
} yield (acc + i) | |
} | |
def worstCaseScenario2 = | |
(0 to N).map(i => delay(i)).foldLeft(delay(0)) { | |
(f1,f2) => for { | |
acc <- f1 | |
i <- f2 | |
} yield (acc + i) | |
} | |
def worstCaseScenario2a = | |
(0 to N).map(i => delay(i)).foldLeft(now(0)) { | |
(f1,f2) => for { | |
acc <- f1 | |
i <- f2 | |
} yield (acc + i) | |
} | |
def worstCaseScenario3 = | |
(0 to N).map(i => Future(i).start).foldLeft(delay(0)) { | |
(f1,f2) => for { | |
acc <- f1 | |
i <- f2 | |
} yield (acc + i) | |
} | |
def runRepeatedly(n: Int): Future[Unit] = | |
if (n > 0) Future { () } flatMap (_ => runRepeatedly(n-1)) | |
else now(()) | |
println(worstCaseScenario1.run) | |
println(worstCaseScenario2.run) | |
println(worstCaseScenario2a.run) | |
println(worstCaseScenario3.run) | |
runRepeatedly(10000).run | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment