Skip to content

Instantly share code, notes, and snippets.

@danbills
Forked from rtitle/futures and cats.md
Last active May 26, 2017 19:12
Show Gist options
  • Save danbills/7e83c28f0c3cafb71acd45d819fc8a81 to your computer and use it in GitHub Desktop.
Save danbills/7e83c28f0c3cafb71acd45d819fc8a81 to your computer and use it in GitHub Desktop.

Open the REPL and import some stuff:

Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
Type in expressions for evaluation. Or try :help.

scala> import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

Define some successful and failing futures for testing:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def future1: Future[Int] = Future {
  println("future 1 is executing!")
  1
}

def future2: Future[Int] = Future {
  println("future 2 is executing!")
  throw new Exception("Future 2 failed")
}

def future3: Future[Int] = Future {
  println("future 3 is executing!")
  throw new Exception("Future 3 failed")
}

def future4: Future[Int] = Future {
  println("future 4 is executing!")
  4
}

// Exiting paste mode, now interpreting.

future1: scala.concurrent.Future[Int]
future2: scala.concurrent.Future[Int]
future3: scala.concurrent.Future[Int]
future4: scala.concurrent.Future[Int]

If I flatMap the futures, it bails after the first failing future, and the resulting future is a failure.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
  f4 <- future4
} yield f1 + f2 + f3 + f4

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
java.lang.Exception: Future 2 failed
  at $anonfun$future2$1.apply(<console>:27)
  at $anonfun$future2$1.apply(<console>:25)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I could stick a recovery on each of the futures to ensure they all get executed, but that loses the exception information:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def recovery: PartialFunction[Throwable, Int] = {
  case _ => 0
} 

val res = for {
  f1 <- future1 recover recovery
  f2 <- future2 recover recovery
  f3 <- future3 recover recovery
  f4 <- future4 recover recovery
} yield f1 + f2 + f3 + f4

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
res4: Int = 5

Cats to the rescue!

scala> import cats._, cats.implicits._, cats.data._
import cats._
import cats.implicits._
import cats.data._

Cats has Apply. This lets you "map" over many things in parallel, and act on all the results at once. Here's an example with our futures:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = (future1 |@| future2 |@| future3 |@| future4).map(_ + _ + _ + _)
// or equivalently, Apply[Future].map4(future1, future2, future3, future4)(_ + _ + _ + _)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
java.lang.Exception: Future 2 failed
  at $anonfun$future2$1.apply(<console>:27)
  at $anonfun$future2$1.apply(<console>:25)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Unlike our for-comprehension, this ran all the futures in parallel. But like the for-comprehension, the overall result is still the first one that failed.

We can do better. Cats also defines the Validated[A, B] class. Its structure is similar to Either[A, B]: it has either an A or a B value, but not both. But its semantics are a little different in cats. Basically, Either is used for "short-circuiting" behavior (i.e., the for-comprehension example above). Validated is used for "error accumulation" behavior. We'll see how this works in a minute.

ValidatedNel[A, B] is just a type alias for Validated[NonEmptyList[A], B]. Cats provides this alias because accumulating errors in a NonEmptyList is a common pattern.

We can convert a Future[A] to a Future[ValidatedNel[Throwable, A]] like so:

def toValidatedNel[A](future: Future[A]): Future[ValidatedNel[Throwable, A]] = {
  future.map(Validated.valid).recover { case e => 
    Validated.invalidNel(e) 
  }
}

However, I like using implicit classes for these types of conversions (maybe a personal preference):

scala> :paste
// Entering paste mode (ctrl-D to finish)

implicit class EnrichedFuture[A](future: Future[A]) {
  def toValidatedNel: Future[ValidatedNel[Throwable, A]] = {
    future.map(Validated.valid).recover { case e => 
      Validated.invalidNel(e) 
    }
  }
}

// Exiting paste mode, now interpreting.

defined class EnrichedFuture

Now let's use Apply with our Validated converter:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = (future1.toValidatedNel |@| future2.toValidatedNel |@| future3.toValidatedNel |@| future4.toValidatedNel).map(_ |+| _ |+| _ |+| _)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!

res7: cats.data.Validated[cats.data.NonEmptyList[Throwable],Int] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

Interesting - it still ran all the futures, but this time it collected all the failures in a NonEmptyList. The resulting future is successful with an Invalid(NonEmptyList(Exception, Exception)) inside of it.

Enter Traverse, a seriously awesome typeclass. Here's an example using the data we've built up so far:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = List(future1, future2, future3, future4).traverse(_.toValidatedNel)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 3 is executing!
future 2 is executing!
future 4 is executing!

res11: List[cats.data.ValidatedNel[Throwable,Int]] = List(Valid(1), Invalid(NonEmptyList(java.lang.Exception: Future 2 failed)), Invalid(NonEmptyList(java.lang.Exception: Future 3 failed)), Valid(4))

This time, all the futures ran in parallel, and it collected all the successes and failures in a List of ValidatedNel's.

More explanation about Traverse

Traverse is kind of like an Apply and a fold at the same time. To understand it, let's take a simpler example:

scala> val list: List[Int] = List(1, 2, 3)
list: List[Int] = List(1, 2, 3)

scala> val traversed: Option[List[String]] = list.traverse(_.toString.some)
traversed: Option[List[String]] = Some(List(1, 2, 3))

What happened? Taking it step by step, it looks like it walked through the list, converted each Int element to an Option[String], then somehow "flipped" the types from List[Option[String]] (which you would get if just calling map) to an Option[List[String]]. Behind the scenes, it's actually using Apply under a fold to do this.

Similarly, for our future example, it's traversing a List[Future[A]], converting each Future[A] to a Future[ValidatedNel[Throwable, A]], then flipping the outer types so a List[Future[ValidatedNel[Throwable, A]]] becomes a Future[List[ValidatedNel[Throwable, A]]]. Because we're using Validated and traverse uses Apply behind the scenes, we get the desired error accumulation behavior "for free".

It's worth mentioning that if you have 2 nested "boxes", e.g. List[Future[String]], then traversing with the identity function simply flips the boxes with no other transformation. This is also called sequence:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def listOfFutures: List[Future[String]] = List(Future.successful("first"), Future.successful("second"))

val sequenced: Future[List[String]] = listOfFutures.traverse(identity)
// same as listOfFutures.sequence

Await.result(sequenced, 1 second)

// Exiting paste mode, now interpreting.

res24: List[String] = List(first, second)

That's all for now...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment