Skip to content

Instantly share code, notes, and snippets.

@sortega
Last active January 8, 2018 13:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sortega/4cbe6f648d2c69449a54e89e79613e1a to your computer and use it in GitHub Desktop.
Save sortega/4cbe6f648d2c69449a54e89e79613e1a to your computer and use it in GitHub Desktop.
Future.traverse is not fail fast
package sandbox.futures
import scala.collection.generic.CanBuildFrom
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.higherKinds
import scala.util.{Random, Success}
import scala.util.control.NoStackTrace
object FailFastTraverse {
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(
implicit cbf: CanBuildFrom[M[A], B, M[B]],
ec: ExecutionContext): Future[M[B]] =
in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
fr.transformWith {
case _: Success[_] => fr.zipWith(fn(a))(_ += _)
case _ => fr
}
}
.map(_.result())
}
object FailFastTest extends App {
import scala.concurrent.ExecutionContext.Implicits.global
def sleep(): Unit = {
Thread.sleep(100 + Random.nextInt(5000))
}
val result = FailFastTraverse.traverse(1 to 100) { n =>
sleep()
println(n)
if (Random.nextBoolean()) Future.successful(n)
else {
println("failing!")
Future.failed(new Exception("ka-poom") with NoStackTrace)
}
}
Await.result(result, 3.minutes)
}
package futures
import scala.collection.generic.CanBuildFrom
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.higherKinds
import scala.util.{Random, Success, Try, Failure}
import scala.util.control.NoStackTrace
import scalaz._
import Scalaz._
object FailFastTraverse {
def traverse[A, B, M[X] <: TraversableOnce[X], F[_]](in: M[A])(fn: A => F[B])(
implicit cbf: CanBuildFrom[M[A], B, M[B]],
F: MonadError[F, Throwable]): F[M[B]] =
in.foldLeft(F.point(cbf(in))) { (fr, a) =>
attempt(fr).flatMap {
case _: Success[_] => ^(fr, fn(a)) { _ += _ }
case _ => fr
}
}
.map(_.result())
private def attempt[F[_], A](ma: F[A])(
implicit F: MonadError[F, Throwable]): F[Try[A]] =
ma.map[Try[A]](Success.apply).handleError(ex => F.point(Failure(ex)))
}
object FailFastTest extends App {
import scala.concurrent.ExecutionContext.Implicits.global
def sleep(): Unit = {
Thread.sleep(100 + Random.nextInt(5000))
}
val result = FailFastTraverse.traverse(1 to 100) { n =>
sleep()
println(n)
if (Random.nextBoolean()) Future.successful(n)
else {
println("failing!")
Future.failed(new Exception("ka-poom") with NoStackTrace)
}
}
Await.result(result, 3.minutes)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment