Skip to content

Instantly share code, notes, and snippets.

@stanislav-chetvertkov
Created April 24, 2018 13:42
Show Gist options
  • Save stanislav-chetvertkov/c3e9ed1ec82f00aff1fbc2f833892809 to your computer and use it in GitHub Desktop.
Save stanislav-chetvertkov/c3e9ed1ec82f00aff1fbc2f833892809 to your computer and use it in GitHub Desktop.
feedback loopgraph
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, ClosedShape, OverflowStrategy}
object GraphApp extends App {
implicit val actorSystem: ActorSystem = ActorSystem(name = "prog-voice")
implicit val mt: ActorMaterializer = ActorMaterializer()
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val in = Source(1 to 10).buffer(1, OverflowStrategy.fail)
val out = Sink.foreach(print)
val m2 = builder.add(Flow[Either[Int, Int]].map(i => {print("flow:" + i); i}))
val mapper = builder.add(Flow[Int].statefulMapConcat{ () =>
var retries = 0
i =>
if (i == 3) {
if (retries > 0){
List(Right(i))
} else {
retries += 1
List(Left(i))
}
} else {
List(Right(i))
}
})
val concat = builder.add(Merge[Int](2))
val partition = builder.add(Partition[Either[Int, Int]](2, input => if (input.isRight) 0 else 1))
in ~> concat.in(0)
concat.out ~> mapper ~> m2 ~> partition.in
partition.out(0).map(_.right.get) ~> out
partition.out(1).map(_.left.get ) ~> concat.in(1)
ClosedShape
})
graph.run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment