Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Blog: GraphStage emit and friends
package blog
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
class MaxStage extends GraphStage[FlowShape[Int, Int]] {
val in: Inlet[Int] = Inlet("MaxStage.in")
val out: Outlet[Int] = Outlet("MaxStage.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var maxValue = Int.MinValue
var maxPushed = Int.MinValue
setHandler(in, new InHandler {
override def onPush(): Unit = {
maxValue = math.max(maxValue, grab(in))
if (isAvailable(out) && maxValue > maxPushed)
pushMaxValue()
pull(in)
}
override def onUpstreamFinish(): Unit = {
if (maxValue > maxPushed)
pushMaxValue()
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (maxValue > maxPushed)
pushMaxValue()
else if (!hasBeenPulled(in))
pull(in)
}
})
def pushMaxValue(): Unit = {
maxPushed = maxValue
push(out, maxPushed)
}
}
}
package blog
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
class MaxStage2 extends GraphStage[FlowShape[Int, Int]] {
val in: Inlet[Int] = Inlet("MaxStage.in")
val out: Outlet[Int] = Outlet("MaxStage.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var maxValue = Int.MinValue
var maxPushed = Int.MinValue
var finishing = false
setHandler(in, new InHandler {
override def onPush(): Unit = {
maxValue = math.max(maxValue, grab(in))
if (isAvailable(out) && maxValue > maxPushed)
pushMaxValue()
pull(in)
}
override def onUpstreamFinish(): Unit = {
if (maxValue > maxPushed) {
if (isAvailable(out)) {
pushMaxValue()
completeStage()
} else {
// push final value and complete stage in onPull
finishing = true
}
} else {
completeStage()
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (maxValue > maxPushed) {
pushMaxValue()
if (finishing)
completeStage()
} else if (!hasBeenPulled(in))
pull(in)
}
})
def pushMaxValue(): Unit = {
maxPushed = maxValue
push(out, maxPushed)
}
}
}
package blog
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
class MaxStage3 extends GraphStage[FlowShape[Int, Int]] {
val in: Inlet[Int] = Inlet("MaxStage.in")
val out: Outlet[Int] = Outlet("MaxStage.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var maxValue = Int.MinValue
var maxPushed = Int.MinValue
setHandler(in, new InHandler {
override def onPush(): Unit = {
maxValue = math.max(maxValue, grab(in))
if (isAvailable(out) && maxValue > maxPushed)
pushMaxValue()
pull(in)
}
override def onUpstreamFinish(): Unit = {
if (maxValue > maxPushed)
emit(out, maxValue)
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (maxValue > maxPushed)
pushMaxValue()
else if (!hasBeenPulled(in))
pull(in)
}
})
def pushMaxValue(): Unit = {
maxPushed = maxValue
push(out, maxPushed)
}
}
}
package blog
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpec
class MaxStageSpec extends WordSpec with Matchers with BeforeAndAfterAll {
implicit val system = ActorSystem("MaxLimitSpec")
implicit val mat = ActorMaterializer()
"MaxStage" should {
"emit max value as known so far" in {
val (upstream, downstream) =
TestSource.probe[Int]
.via(new MaxStage3)
.toMat(TestSink.probe)(Keep.both)
.run()
// send element 10 from upstream
upstream.sendNext(10)
downstream.request(1)
// and it is received by downstream
downstream.expectNext(10)
downstream.request(1)
upstream.sendNext(9)
upstream.sendNext(8)
// no new max yet since 9 and 8 are < 10
downstream.expectNoMsg(200.millis)
upstream.sendNext(11)
// new max emitted by the stage
downstream.expectNext(11)
upstream.sendNext(17)
// end the stream
upstream.sendComplete()
// no request from downstream yet
downstream.expectNoMsg(200.millis)
downstream.request(1)
// get the final element
downstream.expectNext(17)
downstream.expectComplete()
}
}
override protected def afterAll() = {
Await.ready(system.terminate(), 10.seconds)
super.afterAll()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment