Last active
August 7, 2017 18:48
-
-
Save russwyte/8241e1552655655ca80759ba8b75034e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
object Example extends App { | |
implicit val system = ActorSystem("QuickStart") | |
implicit val ec = system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
val sources = (1 to 100) | |
.map(n => s"step-$n") | |
.map(name => { | |
computeSource(name) | |
.initialTimeout(3.seconds) | |
.recoverWithRetries(1, { | |
case _ => | |
println(s"retrying computation $name because it timed out.") | |
computeSource(name).initialDelay(100.millis) | |
}) | |
}) | |
val f = Source | |
.zipN[Result](sources) | |
.initialTimeout(20.seconds) | |
.recover[Seq[Result]] { | |
case _ => | |
println("The whole thing timed out!!! Roll back here if applicable. :)") | |
Seq.empty | |
} | |
.runWith(Sink.foreach(x => { | |
println(s"x => $x") | |
})) | |
f.onComplete(_ => system.terminate()) | |
def computeSource(name: String): Source[Result, NotUsed] = { | |
Source.fromFuture(compute(name)).flatMapConcat { | |
case s: Success => Source.single(s) | |
case Failure => | |
println(s"retrying computation $name for bad result") | |
computeSource(name) | |
} | |
} | |
def compute(name: String): Future[Result] = Future { | |
val seconds: Int = 1 + (Math.random() * 10).toInt | |
val res = if (seconds % 2 == 0) Success(seconds) else Failure | |
println(s"$name trying seconds $seconds - which will end in $res") | |
Thread.sleep(seconds * 250) | |
res | |
} | |
sealed trait Result | |
case class Success(n: Int) extends Result | |
case object Failure extends Result | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment