Skip to content

Instantly share code, notes, and snippets.

@leftofnull
Last active February 22, 2016 15:44
Show Gist options
  • Save leftofnull/3e4c2a6b18fe71d219b6 to your computer and use it in GitHub Desktop.
Save leftofnull/3e4c2a6b18fe71d219b6 to your computer and use it in GitHub Desktop.
package com.leftofnull.streams
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import cats.data.Xor
import cats.data.Xor._
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.blocking
import scala.language.{implicitConversions, postfixOps}
import scala.util.{Failure, Success, Try}
class Testing()(implicit val materializer: ActorMaterializer) extends TestStream {
// Only reason for these two is to be able to validate the counts after each test
var errors = List.empty[(Throwable, (String, String))]
var processed = List.empty[(String, String)]
val errorHandler = (e: (Throwable, (String, String))) =>
blocking(errors = e :: errors)
val publisher = (t: (String, String)) => {
if (math.random < 0.5d) throw new Exception("just a test")
else blocking(processed = t :: processed)
}
val process = consume(errorHandler, publisher, _: String)
def processString(s: String): Unit = process(s).run
}
abstract class TestStream extends LazyLogging {
implicit val materializer: ActorMaterializer
type Data = (String, String)
type BadData = (Throwable, Data)
def consume(
errorHandler: BadData => Unit, fn: Data => Unit, a: String
): RunnableGraph[Unit] = RunnableGraph.fromGraph(
GraphDSL.create() { implicit b: GraphDSL.Builder[Unit] =>
import GraphDSL.Implicits._
val source = b.add(Source.single(a))
val broadcast = b.add(Broadcast[String](2))
val merge = b.add(Zip[String, String])
val process = new ProcessorFlow(fn)
val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
(input: Xor[BadData, Data]) => {
input.swap.getOrElse((new Throwable, ("", "")))
}
))
val sink = b.add(Sink.foreach[BadData](errorHandler))
source ~> broadcast.in
broadcast.out(0) ~> Flow[String].map(_.reverse) ~> merge.in0
broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
merge.out ~> process ~> failed ~> errors ~> sink
ClosedShape
}
)
}
class ProcessorFlow[I](publish: I => Unit)
extends GraphStage[FlowShape[I, Xor[(Throwable, I), I]]] {
val in = Inlet[I]("Publish In")
val out = Outlet[Xor[(Throwable, I), I]]("Publish Out")
override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
val mappedElem = Try { publish(elem) } match {
case Failure(x: Throwable) => left((x, elem))
case Success(_) => right(elem)
}
push(out, mappedElem)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
class LeftFlow[I, O](f: I => O) extends GraphStage[FlowShape[I, O]] {
val in = Inlet[I]("Xor in")
val out = Outlet[O]("Xor.left out")
override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = f(grab(in))
push(out, elem)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
@viktorklang
Copy link

WARNING: I haven't tried the following code.

I think you ought to return a RunnableGraph which materializes a Future[Done] when it is done,
something like the following.

Then you can do your assertions after the Future has completed.

def consume(
    errorHandler: BadData => Unit, fn: Data => Unit, a: String
  ): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
    GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
      import GraphDSL.Implicits._

      val source = b.add(Source.single(a))
      val broadcast = b.add(Broadcast[String](2))
      val merge = b.add(Zip[String, String])
      val process = new ProcessorFlow(fn)
      val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
      val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
        (input: Xor[BadData, Data]) =>
          input.swap.getOrElse((new Throwable, ("", "")))
      ))

      source ~> broadcast.in
                broadcast.out(0) ~> Flow[String].map(_.reverse)       ~> merge.in0
                broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
                                                                         merge.out ~> process ~> failed ~> errors ~> sink

      ClosedShape
    }
  )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment