Skip to content

Instantly share code, notes, and snippets.

@hussachai
Last active June 13, 2017 23:45
Show Gist options
  • Save hussachai/a3266d3a4298493f857f593c304c9388 to your computer and use it in GitHub Desktop.
Save hussachai/a3266d3a4298493f857f593c304c9388 to your computer and use it in GitHub Desktop.
Bad snippet! Don't do it.
object BadBidiFlow extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val source = Source.fromIterator(() => Seq("1", "2a", "3").toIterator)
val sink = Sink.foreach(println)
val endFlow = Flow.fromFunction[String, (String, Try[Int])]{ a => (a, Try(a.toInt)) }
val graph = BidiFlow.fromGraph(new BidiRetryGraph).join(endFlow)
val runnableGraph = source.via(graph).to(sink)
runnableGraph.run()
class BidiRetryGraph extends GraphStage[BidiShape[String, String, (String, Try[Int]), Try[Int]]] {
val in0 = Inlet[String]("in0")
val out0 = Outlet[String]("out0")
val in1 = Inlet[(String, Try[Int])]("in1")
val out1 = Outlet[Try[Int]]("out1")
override def shape: BidiShape[String, String, (String, Try[Int]), Try[Int]] = BidiShape.of(in0, out0, in1, out1)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in0, new InHandler(){
override def onPush(): Unit = {
push(out0, grab(in0))
}
})
setHandler(out0, new OutHandler(){
override def onPull(): Unit = {
pull(in0)
}
})
setHandler(in1, new InHandler(){
override def onPush(): Unit = {
val res = grab(in1)
res._2 match {
case Success(_) =>
push(out1, res._2)
case Failure(e) =>
val n = res._1.filter(c => c.isDigit).mkString
push(out0, n) // requirement failed: Cannot push port (out0) twice
}
}
})
setHandler(out1, new OutHandler(){
override def onPull(): Unit = {
pull(in1)
}
})
}
}
}
@hussachai
Copy link
Author

           +----------+           +----------+
           |          |           |          |
----------->   |----->------------>          |
           |   |      |           |          |
           |   |      |           |          |
           |   |      |           |          |
<-----------   |-------<-----------          |
           |          |           |          |
           +----------+           +----------+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment