Skip to content

Instantly share code, notes, and snippets.

@russwyte
Last active August 7, 2017 18:48
Show Gist options
  • Save russwyte/8241e1552655655ca80759ba8b75034e to your computer and use it in GitHub Desktop.
Save russwyte/8241e1552655655ca80759ba8b75034e to your computer and use it in GitHub Desktop.
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Future
import scala.concurrent.duration._
object Example extends App {
implicit val system = ActorSystem("QuickStart")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val sources = (1 to 100)
.map(n => s"step-$n")
.map(name => {
computeSource(name)
.initialTimeout(3.seconds)
.recoverWithRetries(1, {
case _ =>
println(s"retrying computation $name because it timed out.")
computeSource(name).initialDelay(100.millis)
})
})
val f = Source
.zipN[Result](sources)
.initialTimeout(20.seconds)
.recover[Seq[Result]] {
case _ =>
println("The whole thing timed out!!! Roll back here if applicable. :)")
Seq.empty
}
.runWith(Sink.foreach(x => {
println(s"x => $x")
}))
f.onComplete(_ => system.terminate())
def computeSource(name: String): Source[Result, NotUsed] = {
Source.fromFuture(compute(name)).flatMapConcat {
case s: Success => Source.single(s)
case Failure =>
println(s"retrying computation $name for bad result")
computeSource(name)
}
}
def compute(name: String): Future[Result] = Future {
val seconds: Int = 1 + (Math.random() * 10).toInt
val res = if (seconds % 2 == 0) Success(seconds) else Failure
println(s"$name trying seconds $seconds - which will end in $res")
Thread.sleep(seconds * 250)
res
}
sealed trait Result
case class Success(n: Int) extends Result
case object Failure extends Result
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment