Skip to content

Instantly share code, notes, and snippets.

@ecyshor
Last active July 30, 2020 15:09
Show Gist options
  • Save ecyshor/b904491d21ae3a3683dd99f339de3442 to your computer and use it in GitHub Desktop.
Save ecyshor/b904491d21ae3a3683dd99f339de3442 to your computer and use it in GitHub Desktop.
akka streams RestartFlow playground
import akka.actor.ActorSystem
import akka.stream.ActorAttributes
import akka.stream.Supervision.Stop
import akka.stream.scaladsl.{ Flow, RestartFlow, Sink, Source }
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scala.concurrent.Future
import scala.util.Random
object MyApp extends App {
implicit val as: ActorSystem = ActorSystem()
val parallelism = 10
Await.result(
Source
.fromIterator(() => (1 to 10).iterator)
.via(RestartFlow.onFailuresWithBackoff(1.second, 2.seconds, 0.2, -1)(() => {
Flow[Int]
.mapAsync(parallelism)(el => {
if (Random.nextBoolean()) {
Future.successful(el)
} else { Future.failed(throw new IllegalArgumentException("WOOOO")) }
})
.addAttributes(ActorAttributes.supervisionStrategy {
case NonFatal(e) => {
println(s"LOST ${e}")
Stop
}
})
}))
.runWith(Sink.foreach(el => {
println(s"FINISHED $el")
})),
10.seconds
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment