Skip to content

Instantly share code, notes, and snippets.

@rtitle
Last active January 27, 2021 05:03
Show Gist options
  • Save rtitle/f73d35e79a2f95871bca27d24be3a805 to your computer and use it in GitHub Desktop.
Save rtitle/f73d35e79a2f95871bca27d24be3a805 to your computer and use it in GitHub Desktop.

Open the REPL and import some stuff:

Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
Type in expressions for evaluation. Or try :help.

scala> import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

Define some successful and failing futures for testing:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def future1: Future[Int] = Future {
  println("future 1 is executing!")
  1
}

def future2: Future[Int] = Future {
  println("future 2 is executing!")
  throw new Exception("Future 2 failed")
}

def future3: Future[Int] = Future {
  println("future 3 is executing!")
  throw new Exception("Future 3 failed")
}

def future4: Future[Int] = Future {
  println("future 4 is executing!")
  4
}

// Exiting paste mode, now interpreting.

future1: scala.concurrent.Future[Int]
future2: scala.concurrent.Future[Int]
future3: scala.concurrent.Future[Int]
future4: scala.concurrent.Future[Int]

If I flatMap the futures, it bails after the first failing future, and the resulting future is a failure.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
  f4 <- future4
} yield f1 + f2 + f3 + f4

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
java.lang.Exception: Future 2 failed
  at $anonfun$future2$1.apply(<console>:27)
  at $anonfun$future2$1.apply(<console>:25)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I could stick a recovery on each of the futures to ensure they all get executed, but that loses the exception information:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def recovery: PartialFunction[Throwable, Int] = {
  case _ => 0
} 

val res = for {
  f1 <- future1 recover recovery
  f2 <- future2 recover recovery
  f3 <- future3 recover recovery
  f4 <- future4 recover recovery
} yield f1 + f2 + f3 + f4

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
res4: Int = 5

Cats to the rescue!

scala> import cats._, cats.implicits._, cats.data._
import cats._
import cats.implicits._
import cats.data._

Cats has Apply. This lets you "map" over many things in parallel, and act on all the results at once. Here's an example with our futures:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = (future1 |@| future2 |@| future3 |@| future4).map(_ + _ + _ + _)
// or equivalently, Apply[Future].map4(future1, future2, future3, future4)(_ + _ + _ + _)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
java.lang.Exception: Future 2 failed
  at $anonfun$future2$1.apply(<console>:27)
  at $anonfun$future2$1.apply(<console>:25)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Unlike our for-comprehension, this ran all the futures in parallel. But like the for-comprehension, the overall result is still the first one that failed.

We can do better. Cats also defines the Validated[A, B] class. Its structure is similar to Either[A, B]: it has either an A or a B value, but not both. But its semantics are a little different in cats. Basically, Either is used for "short-circuiting" behavior (i.e., the for-comprehension example above). Validated is used for "error accumulation" behavior. We'll see how this works in a minute.

ValidatedNel[A, B] is just a type alias for Validated[NonEmptyList[A], B]. Cats provides this alias because accumulating errors in a NonEmptyList is a common pattern.

We can convert a Future[A] to a Future[ValidatedNel[Throwable, A]] like so:

def toValidatedNel[A](future: Future[A]): Future[ValidatedNel[Throwable, A]] = {
  future.map(Validated.valid).recover { case e => 
    Validated.invalidNel(e) 
  }
}

However, I like using implicit classes for these types of conversions (maybe a personal preference):

scala> :paste
// Entering paste mode (ctrl-D to finish)

implicit class EnrichedFuture[A](future: Future[A]) {
  def toValidatedNel: Future[ValidatedNel[Throwable, A]] = {
    future.map(Validated.valid).recover { case e => 
      Validated.invalidNel(e) 
    }
  }
}

// Exiting paste mode, now interpreting.

defined class EnrichedFuture

Now let's use Apply with our Validated converter:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = (future1.toValidatedNel |@| future2.toValidatedNel |@| future3.toValidatedNel |@| future4.toValidatedNel).map(_ |+| _ |+| _ |+| _)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!

res7: cats.data.Validated[cats.data.NonEmptyList[Throwable],Int] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

Interesting - it still ran all the futures, but this time it collected all the failures in a NonEmptyList. The resulting future is successful with an Invalid(NonEmptyList(Exception, Exception)) inside of it.

Enter Traverse, a seriously awesome typeclass. Here's an example using the data we've built up so far:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val res = List(future1, future2, future3, future4).traverse(_.toValidatedNel)

Await.result(res, 1 second)

// Exiting paste mode, now interpreting.

future 1 is executing!
future 3 is executing!
future 2 is executing!
future 4 is executing!

res11: List[cats.data.ValidatedNel[Throwable,Int]] = List(Valid(1), Invalid(NonEmptyList(java.lang.Exception: Future 2 failed)), Invalid(NonEmptyList(java.lang.Exception: Future 3 failed)), Valid(4))

This time, all the futures ran in parallel, and it collected all the successes and failures in a List of ValidatedNel's.

More explanation about Traverse

Traverse is kind of like an Apply and a fold at the same time. To understand it, let's take a simpler example:

scala> val list: List[Int] = List(1, 2, 3)
list: List[Int] = List(1, 2, 3)

scala> val traversed: Option[List[String]] = list.traverse(_.toString.some)
traversed: Option[List[String]] = Some(List(1, 2, 3))

What happened? Taking it step by step, it looks like it walked through the list, converted each Int element to an Option[String], then somehow "flipped" the types from List[Option[String]] (which you would get if just calling map) to an Option[List[String]]. Behind the scenes, it's actually using Apply under a fold to do this.

Similarly, for our future example, it's traversing a List[Future[A]], converting each Future[A] to a Future[ValidatedNel[Throwable, A]], then flipping the outer types so a List[Future[ValidatedNel[Throwable, A]]] becomes a Future[List[ValidatedNel[Throwable, A]]]. Because we're using Validated and traverse uses Apply behind the scenes, we get the desired error accumulation behavior "for free".

It's worth mentioning that if you have 2 nested "boxes", e.g. List[Future[String]], then traversing with the identity function simply flips the boxes with no other transformation. This is also called sequence:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def listOfFutures: List[Future[String]] = List(Future.successful("first"), Future.successful("second"))

val sequenced: Future[List[String]] = listOfFutures.traverse(identity)
// same as listOfFutures.sequence

Await.result(sequenced, 1 second)

// Exiting paste mode, now interpreting.

res24: List[String] = List(first, second)

That's all for now...

@kshakir
Copy link

kshakir commented May 26, 2017

Awesome! Thanks for the write up. I believe I only need the ValidatedNel[Throwable,Int], so that all the errors can be reported back to the user/admin, instead of just the first.

Hope you're willing to continue sharing in the future. As an example, I went looking for more info on Traverse, and almost drowned in a well of haskell on eed3si9n's blog. Luckily, cats' docs threw me a rescue line bringing me back to the land of Scala.

For those who want to also want to play with a live version of the above examples, I started a scastie here.

@rtitle
Copy link
Author

rtitle commented May 26, 2017

Thanks @kshakir, glad you liked it. Yeah I'm not sure this is any better than the existing cats writeups/tutorials, but writing it makes me understand it better at least. :)

I did throw up some more info about Traverse, as I understand it. The cats docs are pretty helpful here..

@danbills
Copy link

peep this:


We have a Future[List[Validated]], do we want a Future[Validated[List]]?

How do we do that?

It turns out that applicative instances can be combined arbitrarily ("composed")

//Pull in the compose instances we need
scala> scala.data.Validated.catsDataApplicativeErrorForValidated[NonEmptyList[Throwable]]
res24: cats.ApplicativeError[[β$2$]cats.data.Validated[cats.data.NonEmptyList[Throwable],β$2$],cats.data.NonEmptyList[Throwable]] = cats.data.ValidatedInstances$$anon$1@6e35a59a

scala> cats.instances.future.catsStdInstancesForFuture
res26: cats.MonadError[scala.concurrent.Future,Throwable] with cats.CoflatMap[scala.concurrent.Future] with cats.Monad[scala.concurrent.Future] = cats.instances.FutureInstances$$anon$1@43d11948

scala> res26 compose res24
res27: cats.Applicative[[α]scala.concurrent.Future[cats.data.Validated[cats.data.NonEmptyList[Throwable],α]]] = cats.Applicative$$anon$1@17b11a80

//Now we have an Applicative that can deal with Future[ValidatedNel]!

scala> type C[A] = Future[ValidatedNel[Throwable, A]]
defined type alias C

scala> List(future1, future2, future3, future4).traverse[C, Int](_.toValidatedNel)(res27)
future 1 is executing!
future 3 is executing!
future 2 is executing!
future 4 is executing!
res28: C[List[Int]] = Future(<not completed>)

scala> Await.result(res17, 1 second)
res29: cats.data.ValidatedNel[Throwable,List[Int]] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

Hooray! We have a Future[ValidatedNel[Throwable, List[Int]]].

@rtitle
Copy link
Author

rtitle commented May 26, 2017

👍 Thanks @danbills! Composing Applicatives == awesome. Not sure I would have come up with that.

Could we do this in the following way as well? Say we define reduce in terms of traverse (this is straight from eed3si9n):

scala> :paste
// Entering paste mode (ctrl-D to finish)

def reduce[A, B: Monoid, F[_]: Traverse](fa: F[A])(f: A => B): B = {
  val x = fa.traverseU(a => Const(f(a)))
  x.getConst
}

// Exiting paste mode, now interpreting.

reduce: [A, B, F[_]](fa: F[A])(f: A => B)(implicit evidence$1: cats.Monoid[B], implicit evidence$2: cats.Traverse[F])B

So we're traversing and accumulating the results in a monoid. So we can get a Future[ValidatedNel[Throwable, Int]] out of this:

scala> reduce(List(future1, future2, future3, future4))(_.toValidatedNel)
future 1 is executing!
future 3 is executing!
future 4 is executing!
future 2 is executing!
res55: scala.concurrent.Future[cats.data.ValidatedNel[Throwable,Int]] = List()

scala> Await.result(res55, 1 second)
res56: cats.data.ValidatedNel[Throwable,Int] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

Or if we augment our enriched Future a bit, we can get our Future[ValidatedNel[Throwable, List[Int]]].

scala> reduce(List(future1, future2, future3, future4))(_.toValidatedNel.map(_.map(List(_))))
future 1 is executing!
future 2 is executing!
future 3 is executing!
future 4 is executing!
res58: scala.concurrent.Future[cats.data.Validated[cats.data.NonEmptyList[Throwable],List[Int]]] = List()

scala> Await.result(res58, 1 second)
res59: cats.data.Validated[cats.data.NonEmptyList[Throwable],List[Int]] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

@kshakir ^

@rtitle
Copy link
Author

rtitle commented May 26, 2017

Er, I guess we don't even need to define reduce, it's just foldMap:

scala> Await.result(List(future1, future2, future3, future4).foldMap(_.toValidatedNel), 1 second)
warning: there was one feature warning; re-run with -feature for details
future 1 is executing!
future 3 is executing!
future 4 is executing!
future 2 is executing!
res2: cats.data.ValidatedNel[Throwable,Int] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

scala> Await.result(List(future1, future2, future3, future4).foldMap(_.toValidatedNel.map(_.map(List(_)))), 1 second)
warning: there was one feature warning; re-run with -feature for details
future 1 is executing!
future 2 is executing!
future 4 is executing!
future 3 is executing!
res5: cats.data.Validated[cats.data.NonEmptyList[Throwable],List[Int]] = Invalid(NonEmptyList(java.lang.Exception: Future 2 failed, java.lang.Exception: Future 3 failed))

@kshakir
Copy link

kshakir commented May 30, 2017

Thanks for all the help so far. I've got more questions that we can maybe discuss in person when there's time. As a preview, here's a code dump of a few different ways I'm experimenting with solving the same problem. I'm eager to learn best practices for simplifying this code, and patterns/anti-patterns that should be cleaned up.

import cats.data._
import cats.implicits._

type ErrorOr[A] = ValidatedNel[Throwable, A]
type FutureErrorOr[A] = Future[ErrorOr[A]]

implicit class ValidatedFuture[A](val future: Future[A]) /* extends AnyVal */ {
  /* Interesting note on typeclasses, this causes a compilation error for function1().result2 --
  def toFutureErrorOr: FutureErrorOr[A] = {
  */
  def toFutureErrorOr: Future[ErrorOr[A]] = {
    future map (value => value.valid) recover { case exception => exception.invalidNel }
  }
}

implicit class ValidatedFutureErrorOr[A](val futureErrorOr: FutureErrorOr[A]) /* extends AnyVal */ {
  def toFuture(errorPrefix: String): Future[A] = {
    futureErrorOr flatMap {
      case Valid(value) => Future.successful(value)
      case Invalid(nel) =>
        val message = nel.toList.mkString(s"$errorPrefix\n  ", "\n  ", "")
        val exception = new RuntimeException(message)
        nel map exception.addSuppressed
        Future.failed(exception)
    }
  }
}

def createOrUpdateIndex(indexDefiner: ElasticsearchIndexDefiner[_]): Future[Unit] = {
  for {
    index <- Future.fromTry(Try(indexDefiner.indexDefinition))
    exists <- elasticsearchDAO.existsIndexType(index)
    _ <- if (exists) Future.successful(()) else elasticsearchDAO.createIndexType(index)
    _ <- elasticsearchDAO.updateFieldDefinitions(index)
  } yield ()
}

def function1(): Future[Unit] = {
  val result1: List[Future[Unit]] = ElasticsearchIndexDefiners.All.toList.map(createOrUpdateIndex)
  val result2: Future[List[ErrorOr[Unit]]] = result1.traverse(_.toFutureErrorOr)
  val result3: FutureErrorOr[List[Unit]] = result2.map(_.sequence)
  val result4: FutureErrorOr[Unit] = result3.map(_.map(_.combineAll))
  val result5: Future[Unit] = result4.toFuture(errorPrefix = "Index creation failed:")
  result5
}

def function2(): Future[Unit] = {
  val result1: List[Future[Unit]] = ElasticsearchIndexDefiners.All.toList.map(createOrUpdateIndex)
  val result3: FutureErrorOr[List[Unit]] = result1.foldMap(_.toFutureErrorOr.map(_.map(List(_))))
  val result4: FutureErrorOr[Unit] = result3.map(_.map(_.combineAll))
  val result5: Future[Unit] = result4.toFuture(errorPrefix = "Index creation failed:")
  result5
}

def function3(): Future[Unit] = {
  val applicativeFuture: Applicative[Future] = implicitly
  val applicativeErrorOr: Applicative[ErrorOr] = implicitly
  val applicativeFutureErrorOr: Applicative[FutureErrorOr] = applicativeFuture compose applicativeErrorOr
  implicit val implicitForResult3 = applicativeFutureErrorOr

  val result1: List[Future[Unit]] = ElasticsearchIndexDefiners.All.toList.map(createOrUpdateIndex)
  /* Compilation error if just using traverse() instead of the explicit traverse[FutureErrorOr, Unit]() */
  val result3: FutureErrorOr[List[Unit]] = result1.traverse[FutureErrorOr, Unit](_.toFutureErrorOr)
  val result4: FutureErrorOr[Unit] = result3.map(_.map(_.combineAll))
  val result5: Future[Unit] = result4.toFuture(errorPrefix = "Index creation failed:")
  result5
}

def function4(): Future[Unit] = {
  /* Explicitly run indexers serially. Only have one index so far, so as an example running it three times. */
  val result2: Future[List[ErrorOr[Unit]]] = for {
    first <- createOrUpdateIndex(ElasticsearchIndexDefiners.ReadGroups).toFutureErrorOr
    second <- createOrUpdateIndex(ElasticsearchIndexDefiners.ReadGroups).toFutureErrorOr
    third <- createOrUpdateIndex(ElasticsearchIndexDefiners.ReadGroups).toFutureErrorOr
  } yield List(first, second, third)
  val result3: FutureErrorOr[List[Unit]] = result2.map(_.sequence)
  val result4: FutureErrorOr[Unit] = result3.map(_.map(_.combineAll))
  val result5: Future[Unit] = result4.toFuture(errorPrefix = "Index creation failed:")
  result5
}

def function5(): Future[Unit] = {
  /* Fold across the indexers, using flatMap to run serially. */
  val initial: Future[List[ErrorOr[Unit]]] = Future.successful(Nil)
  val result2: Future[List[ErrorOr[Unit]]] = ElasticsearchIndexDefiners.All.toList.foldLeft(initial) {
    (acc, index) =>
      for {
        list <- acc
        next <- createOrUpdateIndex(index).toFutureErrorOr
      } yield list :+ next
  }
  val result3: FutureErrorOr[List[Unit]] = result2.map(_.sequence)
  val result4: FutureErrorOr[Unit] = result3.map(_.map(_.combineAll))
  val result5: Future[Unit] = result4.toFuture(errorPrefix = "Index creation failed:")
  result5
}

@rtitle
Copy link
Author

rtitle commented May 31, 2017

I like function2() but I think it can be simplified:

def function2(): Future[Unit] = {
  val result1: List[Future[Unit]] = ElasticsearchIndexDefiners.All.toList.map(createOrUpdateIndex)
  val result2: Future[Unit] = result1.foldMap(_.toFutureErrorOr).toFuture(errorPrefix = "Index creation failed")
  result2
}

What it's doing is invoking a Monoid instance for FutureErrorOr[Unit], or Future[ValidatedNel[Throwable, Unit]]. As it happens cats provides monoids for all these types (the monoid for Unit just returns Unit), so this compiles and gives you what you want: a Future[Unit] where the failure case is a concatenation of all exception messages.

@kshakir
Copy link

kshakir commented Jun 1, 2017

I'm still cleaning up my test code, but early indications are that foldMapM, a mishmash of foldM and foldMap, may do what I ultimately want. I think I can make the futures run serially instead of all at once as long as I instantiate each future inside the call to foldMapM. The internal call to map seems to do what I want-- delay the execution of the next mapping until the previous one has finished.

@butcherless
Copy link

Great job. Just what I was looking for. Very well explained and smooth and progressive increase in the learning path. Thanks!!!

@rtitle
Copy link
Author

rtitle commented Jul 17, 2019

Glad this was useful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment