Open the REPL and import some stuff:
Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
Type in expressions for evaluation. Or try :help.
scala> import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global
Define some successful and failing futures for testing:
scala> :paste
// Entering paste mode (ctrl-D to finish)
def future1: Future[Int] = Future {
println("future 1 is executing!")
1
}
def future2: Future[Int] = Future {
println("future 2 is executing!")
throw new Exception("Future 2 failed")
}
def future3: Future[Int] = Future {
println("future 3 is executing!")
throw new Exception("Future 3 failed")
}
def future4: Future[Int] = Future {
println("future 4 is executing!")
4
}
// Exiting paste mode, now interpreting.
future1: scala.concurrent.Future[Int]
future2: scala.concurrent.Future[Int]
future3: scala.concurrent.Future[Int]
future4: scala.concurrent.Future[Int]
If I flatMap
the futures, it bails after the first failing future, and the resulting future is a failure.
scala> :paste
// Entering paste mode (ctrl-D to finish)
val res = for {
f1 <- future1
f2 <- future2
f3 <- future3
f4 <- future4
} yield f1 + f2 + f3 + f4
Await.result(res, 1 second)
// Exiting paste mode, now interpreting.
future 1 is executing!
future 2 is executing!
java.lang.Exception: Future 2 failed
at $anonfun$future2$1.apply(<console>:27)
at $anonfun$future2$1.apply(<console>:25)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I could stick a recovery on each of the futures to ensure they all get executed, but that loses the exception information:
scala> :paste
// Entering paste mode (ctrl-D to finish)
def recovery: PartialFunction[Throwable, Int] = {
case _ => 0
}
val res = for {
f1 <- future1 recover recovery
f2 <- future2 recover recovery
f3 <- future3 recover recovery
f4 <- future4 recover recovery
} yield f1 + f2 + f3 + f4
Await.result(res, 1 second)
// Exiting paste mode, now interpreting.
future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
res4: Int = 5
Cats to the rescue!
scala> import cats._, cats.implicits._, cats.data._
import cats._
import cats.implicits._
import cats.data._
Cats has Apply
. This lets you "map" over many things in parallel, and act on all the results at once. Here's an example with our futures:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val res = (future1 |@| future2 |@| future3 |@| future4).map(_ + _ + _ + _)
// or equivalently, Apply[Future].map4(future1, future2, future3, future4)(_ + _ + _ + _)
Await.result(res, 1 second)
// Exiting paste mode, now interpreting.
future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
java.lang.Exception: Future 2 failed
at $anonfun$future2$1.apply(<console>:27)
at $anonfun$future2$1.apply(<console>:25)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Unlike our for-comprehension, this ran all the futures in parallel. But like the for-comprehension, the overall result is still the first one that failed.
We can do better. Cats also defines the Validated[A, B]
class. Its structure is similar to Either[A, B]
: it has either an A or a B value, but not both. But its semantics are a little different in cats. Basically, Either
is used for "short-circuiting" behavior (i.e., the for-comprehension example above). Validated
is used for "error accumulation" behavior. We'll see how this works in a minute.
ValidatedNel[A, B]
is just a type alias for Validated[NonEmptyList[A], B]
. Cats provides this alias because accumulating errors in a NonEmptyList
is a common pattern.
We can convert a Future[A]
to a Future[ValidatedNel[Throwable, A]]
like so:
def toValidatedNel[A](future: Future[A]): Future[ValidatedNel[Throwable, A]] = {
future.map(Validated.valid).recover { case e =>
Validated.invalidNel(e)
}
}
However, I like using implicit classes for these types of conversions (maybe a personal preference):
scala> :paste
// Entering paste mode (ctrl-D to finish)
implicit class EnrichedFuture[A](future: Future[A]) {
def toValidatedNel: Future[ValidatedNel[Throwable, A]] = {
future.map(Validated.valid).recover { case e =>
Validated.invalidNel(e)
}
}
}
// Exiting paste mode, now interpreting.
defined class EnrichedFuture
Now let's use Apply
with our Validated
converter:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val res = (future1.toValidatedNel |@| future2.toValidatedNel |@| future3.toValidatedNel |@| future4.toValidatedNel).map(_ |+| _ |+| _ |+| _)
Await.result(res, 1 second)
// Exiting paste mode, now interpreting.
future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
res7: cats.data.Validated[cats.data.NonEmptyList[Throwable],Int] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))
Interesting - it still ran all the futures, but this time it collected all the failures in a NonEmptyList
. The resulting future is successful with an Invalid(NonEmptyList(Exception, Exception))
inside of it.
Enter Traverse, a seriously awesome typeclass. Here's an example using the data we've built up so far:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val res = List(future1, future2, future3, future4).traverse(_.toValidatedNel)
Await.result(res, 1 second)
// Exiting paste mode, now interpreting.
future 1 is executing!
future 3 is executing!
future 2 is executing!
future 4 is executing!
res11: List[cats.data.ValidatedNel[Throwable,Int]] = List(Valid(1), Invalid(NonEmptyList(java.lang.Exception: Future 2 failed)), Invalid(NonEmptyList(java.lang.Exception: Future 3 failed)), Valid(4))
This time, all the futures ran in parallel, and it collected all the successes and failures in a List of ValidatedNel's.
Traverse
is kind of like an Apply
and a fold
at the same time. To understand it, let's take a simpler example:
scala> val list: List[Int] = List(1, 2, 3)
list: List[Int] = List(1, 2, 3)
scala> val traversed: Option[List[String]] = list.traverse(_.toString.some)
traversed: Option[List[String]] = Some(List(1, 2, 3))
What happened? Taking it step by step, it looks like it walked through the list, converted each Int
element to an Option[String]
, then somehow "flipped" the types from List[Option[String]]
(which you would get if just calling map
) to an Option[List[String]]
. Behind the scenes, it's actually using Apply
under a fold
to do this.
Similarly, for our future example, it's traversing a List[Future[A]]
, converting each Future[A]
to a Future[ValidatedNel[Throwable, A]]
, then flipping the outer types so a List[Future[ValidatedNel[Throwable, A]]]
becomes a Future[List[ValidatedNel[Throwable, A]]]
. Because we're using Validated
and traverse
uses Apply
behind the scenes, we get the desired error accumulation behavior "for free".
It's worth mentioning that if you have 2 nested "boxes", e.g. List[Future[String]]
, then traversing with the identity function simply flips the boxes with no other transformation. This is also called sequence
:
scala> :paste
// Entering paste mode (ctrl-D to finish)
def listOfFutures: List[Future[String]] = List(Future.successful("first"), Future.successful("second"))
val sequenced: Future[List[String]] = listOfFutures.traverse(identity)
// same as listOfFutures.sequence
Await.result(sequenced, 1 second)
// Exiting paste mode, now interpreting.
res24: List[String] = List(first, second)
That's all for now...