Skip to content

Instantly share code, notes, and snippets.

@derekjw
Created April 3, 2011 18:45
Show Gist options
  • Save derekjw/900664 to your computer and use it in GitHub Desktop.
Save derekjw/900664 to your computer and use it in GitHub Desktop.
Some possible ideas for making dataflow easier with Future
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index cfe64a8..f1fb976 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -217,6 +217,15 @@ object Future {
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body))
f
}
+
+ def task(body: => Unit, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[Unit] =
+ apply(body, timeout)(dispatcher)
+
+ def promise[T](timeout: Long = Actor.TIMEOUT): CompletableFuture[T] =
+ new DefaultCompletableFuture[T](timeout)
+
+ def value[T](in: Either[Throwable,T]): Future[T] =
+ new AlreadyCompletedFuture(in)
}
sealed trait Future[+T] {
@@ -335,6 +344,14 @@ sealed trait Future[+T] {
}
}
+ final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f =>
+ val opte = f.exception
+ if (opte.isDefined) {
+ val e = opte.get
+ if (pf.isDefinedAt(e)) pf(e)
+ }
+ }
+
/**
* Creates a new Future by applying a PartialFunction to the successful
* result of this Future if a match is found, or else return a MatchError.
diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
index a946713..afa7f26 100644
--- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
+++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
@@ -364,16 +364,16 @@ class FutureSpec extends JUnitSuite {
}
@Test def lesslessIsMore {
- import akka.actor.Actor.spawn
- val dataflowVar, dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue)
+ import Future.task
+ val dataflowVar, dataflowVar2 = Future.promise[Int]()
val begin, end = new StandardLatch
- spawn {
+ task {
begin.await
dataflowVar2 << dataflowVar
end.open
}
- spawn {
+ task {
dataflowVar << 5
}
begin.open
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment