Created
September 3, 2011 18:23
-
-
Save benmmurphy/1191571 to your computer and use it in GitHub Desktop.
Suspendable Future Implementation for Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* scalac -P:continuations:enable */ | |
import scala.util.continuations._ | |
import scala.collection.mutable._ | |
class Future[A] private (ctx: => A @suspendable) { | |
private var res:Option[A] = None | |
def await() : A @suspendable = { | |
if (res.isEmpty) { | |
val result = ctx | |
res = Some(result) | |
result | |
} else { | |
shiftUnit(res.get) | |
} | |
} | |
} | |
object Future { | |
def makeFuture[A,B](func : (A) => B @suspendable) : ((A) => Future[B]) = { | |
def futureFunc(a:A) : Future[B] = { | |
var cb : (B => Unit) = null | |
val future = new Future[B](shift { k : (B => Unit) => | |
cb = k | |
}) | |
reset { | |
val ret = func(a) | |
if (cb == null) { | |
future.res = Some(ret) | |
} else { | |
cb(ret) | |
} | |
} | |
future | |
} | |
futureFunc | |
} | |
} | |
object Test { | |
val conts = new PriorityQueue[Scheduled]()(Ordering[Scheduled].reverse) | |
case class Scheduled (val time : Long, val continuation : (Unit=>Unit)) extends Ordered[Scheduled] { | |
def compare(other: Scheduled) = time.compare(other.time) | |
} | |
def sleep(delay:Int) = { | |
shift { k: (Unit => Unit) => | |
val schedule = Scheduled(System.currentTimeMillis() + delay, k) | |
conts.enqueue(schedule) | |
} | |
} | |
def sleepTwice(delay: Int) = { | |
val firstStart = System.currentTimeMillis() | |
sleep(delay) | |
val secondStart = System.currentTimeMillis() | |
sleep(delay + 500) | |
(secondStart - firstStart, System.currentTimeMillis() - secondStart) | |
} | |
def processCont = { | |
var oldCont = conts.dequeue | |
val timeLeft = oldCont.time - System.currentTimeMillis() | |
if (timeLeft > 0) { | |
Thread.sleep(timeLeft) | |
} | |
oldCont.continuation() | |
} | |
def main(args:Array[String]) : Unit = { | |
var sleepAsync = Future.makeFuture(sleep) | |
var sleepTwiceAsync = Future.makeFuture(sleepTwice) | |
reset { | |
val res1Start = System.currentTimeMillis() | |
sleep(1000) | |
println("res1 took: ", System.currentTimeMillis() - res1Start) | |
val res2Start = System.currentTimeMillis() | |
var res2Future = sleepAsync(2000) | |
val res3Start = System.currentTimeMillis() | |
var res3Future = sleepAsync(3000) | |
res2Future.await | |
println("res2 took: ", System.currentTimeMillis() - res2Start) | |
res3Future.await | |
println("res3 took: ", System.currentTimeMillis() - res3Start) | |
val res4Start = System.currentTimeMillis() | |
val res4Future = sleepTwiceAsync(250) | |
val res5Start = System.currentTimeMillis() | |
val res5Future = sleepTwiceAsync(500) | |
val res4 = res4Future.await | |
println("res4 took: ", System.currentTimeMillis() - res4Start, res4) | |
val res5 = res5Future.await | |
println("res5 took: ", System.currentTimeMillis() - res5Start, res5) | |
} | |
while (!conts.isEmpty) { | |
processCont | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Produces the output:
(res1 took: ,1007)
(res2 took: ,2013)
(res3 took: ,3001)
(res4 took: ,1003,(251,751))
(res5 took: ,1503,(502,1001))