Skip to content

Instantly share code, notes, and snippets.

@rtitle
Last active January 27, 2021 05:03
Show Gist options
  • Save rtitle/f73d35e79a2f95871bca27d24be3a805 to your computer and use it in GitHub Desktop.
Save rtitle/f73d35e79a2f95871bca27d24be3a805 to your computer and use it in GitHub Desktop.

Open the REPL and import some stuff:

Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
Type in expressions for evaluation. Or try :help.

scala> import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

Define some successful and failing futures for testing:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def future1: Future[Int] = Future {
  println("future 1 is executing!")
  1
}

def future2: Future[Int] = Future {
  println("future 2 is executing!")
  throw new Exception("Future 2 failed")
}

def future3: Future[Int] = Future {
  println("future 3 is executing!")
  throw new Exception("Future 3 failed")
}

def future4: Future[Int] = Future {
  println("future 4 is executing!")
  4
}

// Exiting paste mode, now interpreting.

future1: scala.concurrent.Future[Int]
future2: scala.concurrent.Future[Int]
future3: scala.concurrent.Future[Int]
future4: scala.concurrent.Future[Int]

If I flatMap the futures, it bails after the first failing future, and the resulting future is a failure.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
  f4 <- future4
} yield f1 + f2 + f3 + f4

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
java.lang.Exception: Future 2 failed
  at $anonfun$future2$1.apply(<console>:27)
  at $anonfun$future2$1.apply(<console>:25)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I could stick a recovery on each of the futures to ensure they all get executed, but that loses the exception information:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def recovery: PartialFunction[Throwable, Int] = {
  case _ => 0
} 

val res = for {
  f1 <- future1 recover recovery
  f2 <- future2 recover recovery
  f3 <- future3 recover recovery
  f4 <- future4 recover recovery
} yield f1 + f2 + f3 + f4

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
res4: Int = 5

Cats to the rescue!

scala> import cats._, cats.implicits._, cats.data._
import cats._
import cats.implicits._
import cats.data._

Cats has Apply. This lets you "map" over many things in parallel, and act on all the results at once. Here's an example with our futures:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = (future1 |@| future2 |@| future3 |@| future4).map(_ + _ + _ + _)
// or equivalently, Apply[Future].map4(future1, future2, future3, future4)(_ + _ + _ + _)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
java.lang.Exception: Future 2 failed
  at $anonfun$future2$1.apply(<console>:27)
  at $anonfun$future2$1.apply(<console>:25)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Unlike our for-comprehension, this ran all the futures in parallel. But like the for-comprehension, the overall result is still the first one that failed.

We can do better. Cats also defines the Validated[A, B] class. Its structure is similar to Either[A, B]: it has either an A or a B value, but not both. But its semantics are a little different in cats. Basically, Either is used for "short-circuiting" behavior (i.e., the for-comprehension example above). Validated is used for "error accumulation" behavior. We'll see how this works in a minute.

ValidatedNel[A, B] is just a type alias for Validated[NonEmptyList[A], B]. Cats provides this alias because accumulating errors in a NonEmptyList is a common pattern.

We can convert a Future[A] to a Future[ValidatedNel[Throwable, A]] like so:

def toValidatedNel[A](future: Future[A]): Future[ValidatedNel[Throwable, A]] = {
  future.map(Validated.valid).recover { case e => 
    Validated.invalidNel(e) 
  }
}

However, I like using implicit classes for these types of conversions (maybe a personal preference):

scala> :paste
// Entering paste mode (ctrl-D to finish)

implicit class EnrichedFuture[A](future: Future[A]) {
  def toValidatedNel: Future[ValidatedNel[Throwable, A]] = {
    future.map(Validated.valid).recover { case e => 
      Validated.invalidNel(e) 
    }
  }
}

// Exiting paste mode, now interpreting.

defined class EnrichedFuture

Now let's use Apply with our Validated converter:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = (future1.toValidatedNel |@| future2.toValidatedNel |@| future3.toValidatedNel |@| future4.toValidatedNel).map(_ |+| _ |+| _ |+| _)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!

res7: cats.data.Validated[cats.data.NonEmptyList[Throwable],Int] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

Interesting - it still ran all the futures, but this time it collected all the failures in a NonEmptyList. The resulting future is successful with an Invalid(NonEmptyList(Exception, Exception)) inside of it.

Enter Traverse, a seriously awesome typeclass. Here's an example using the data we've built up so far:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = List(future1, future2, future3, future4).traverse(_.toValidatedNel)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 3 is executing!
future 2 is executing!
future 4 is executing!

res11: List[cats.data.ValidatedNel[Throwable,Int]] = List(Valid(1), Invalid(NonEmptyList(java.lang.Exception: Future 2 failed)), Invalid(NonEmptyList(java.lang.Exception: Future 3 failed)), Valid(4))

This time, all the futures ran in parallel, and it collected all the successes and failures in a List of ValidatedNel's.

More explanation about Traverse

Traverse is kind of like an Apply and a fold at the same time. To understand it, let's take a simpler example:

scala> val list: List[Int] = List(1, 2, 3)
list: List[Int] = List(1, 2, 3)

scala> val traversed: Option[List[String]] = list.traverse(_.toString.some)
traversed: Option[List[String]] = Some(List(1, 2, 3))

What happened? Taking it step by step, it looks like it walked through the list, converted each Int element to an Option[String], then somehow "flipped" the types from List[Option[String]] (which you would get if just calling map) to an Option[List[String]]. Behind the scenes, it's actually using Apply under a fold to do this.

Similarly, for our future example, it's traversing a List[Future[A]], converting each Future[A] to a Future[ValidatedNel[Throwable, A]], then flipping the outer types so a List[Future[ValidatedNel[Throwable, A]]] becomes a Future[List[ValidatedNel[Throwable, A]]]. Because we're using Validated and traverse uses Apply behind the scenes, we get the desired error accumulation behavior "for free".

It's worth mentioning that if you have 2 nested "boxes", e.g. List[Future[String]], then traversing with the identity function simply flips the boxes with no other transformation. This is also called sequence:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def listOfFutures: List[Future[String]] = List(Future.successful("first"), Future.successful("second"))

val sequenced: Future[List[String]] = listOfFutures.traverse(identity)
// same as listOfFutures.sequence

Await.result(sequenced, 1 second)

// Exiting paste mode, now interpreting.

res24: List[String] = List(first, second)

That's all for now...

@butcherless
Copy link

Great job. Just what I was looking for. Very well explained and smooth and progressive increase in the learning path. Thanks!!!

@rtitle
Copy link
Author

rtitle commented Jul 17, 2019

Glad this was useful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment