Last active
February 22, 2016 15:44
-
-
Save leftofnull/3e4c2a6b18fe71d219b6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.leftofnull.streams | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} | |
import cats.data.Xor | |
import cats.data.Xor._ | |
import com.typesafe.scalalogging.LazyLogging | |
import scala.concurrent.blocking | |
import scala.language.{implicitConversions, postfixOps} | |
import scala.util.{Failure, Success, Try} | |
class Testing()(implicit val materializer: ActorMaterializer) extends TestStream { | |
// Only reason for these two is to be able to validate the counts after each test | |
var errors = List.empty[(Throwable, (String, String))] | |
var processed = List.empty[(String, String)] | |
val errorHandler = (e: (Throwable, (String, String))) => | |
blocking(errors = e :: errors) | |
val publisher = (t: (String, String)) => { | |
if (math.random < 0.5d) throw new Exception("just a test") | |
else blocking(processed = t :: processed) | |
} | |
val process = consume(errorHandler, publisher, _: String) | |
def processString(s: String): Unit = process(s).run | |
} | |
abstract class TestStream extends LazyLogging { | |
implicit val materializer: ActorMaterializer | |
type Data = (String, String) | |
type BadData = (Throwable, Data) | |
def consume( | |
errorHandler: BadData => Unit, fn: Data => Unit, a: String | |
): RunnableGraph[Unit] = RunnableGraph.fromGraph( | |
GraphDSL.create() { implicit b: GraphDSL.Builder[Unit] => | |
import GraphDSL.Implicits._ | |
val source = b.add(Source.single(a)) | |
val broadcast = b.add(Broadcast[String](2)) | |
val merge = b.add(Zip[String, String]) | |
val process = new ProcessorFlow(fn) | |
val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft)) | |
val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData]( | |
(input: Xor[BadData, Data]) => { | |
input.swap.getOrElse((new Throwable, ("", ""))) | |
} | |
)) | |
val sink = b.add(Sink.foreach[BadData](errorHandler)) | |
source ~> broadcast.in | |
broadcast.out(0) ~> Flow[String].map(_.reverse) ~> merge.in0 | |
broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1 | |
merge.out ~> process ~> failed ~> errors ~> sink | |
ClosedShape | |
} | |
) | |
} | |
class ProcessorFlow[I](publish: I => Unit) | |
extends GraphStage[FlowShape[I, Xor[(Throwable, I), I]]] { | |
val in = Inlet[I]("Publish In") | |
val out = Outlet[Xor[(Throwable, I), I]]("Publish Out") | |
override val shape = FlowShape(in, out) | |
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
val elem = grab(in) | |
val mappedElem = Try { publish(elem) } match { | |
case Failure(x: Throwable) => left((x, elem)) | |
case Success(_) => right(elem) | |
} | |
push(out, mappedElem) | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = pull(in) | |
}) | |
} | |
} | |
class LeftFlow[I, O](f: I => O) extends GraphStage[FlowShape[I, O]] { | |
val in = Inlet[I]("Xor in") | |
val out = Outlet[O]("Xor.left out") | |
override val shape = FlowShape(in, out) | |
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
val elem = f(grab(in)) | |
push(out, elem) | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
pull(in) | |
} | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
WARNING: I haven't tried the following code.
I think you ought to return a RunnableGraph which materializes a Future[Done] when it is done,
something like the following.
Then you can do your assertions after the Future has completed.