Last active
October 14, 2016 13:54
-
-
Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.
Valve for Akka stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler} | |
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} | |
trait ValveSwitch { | |
def open: Unit | |
def close: Unit | |
} | |
class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] { | |
override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out")) | |
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = { | |
val logic = new ValveGraphStageLogic(shape, mode) | |
(logic, logic.switch) | |
} | |
private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape) with InHandler with OutHandler{ | |
import shape._ | |
var bufferedElement = Option.empty[A] | |
val switch = new ValveSwitch { | |
val callback = getAsyncCallback[A](push(out, _)) | |
override def open: Unit = { | |
mode = ValveMode.Open | |
bufferedElement.foreach(callback.invoke) | |
bufferedElement = Option.empty | |
} | |
override def close: Unit = { | |
mode = ValveMode.Closed | |
} | |
} | |
setHandlers(in,out, this) | |
override def onPush(): Unit = { | |
val element = grab(in) //acquires the element that has been received during an onPush | |
if (mode == ValveMode.Open) { | |
push(out, element) //push directly the element on the out port | |
} else { | |
bufferedElement = Some(element) | |
} | |
} | |
override def onPull(): Unit = pull(in) //request the next element on in port | |
} | |
} | |
trait ValveMode | |
object ValveMode { | |
case object Open extends ValveMode | |
case object Closed extends ValveMode | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.pattern.after | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Keep, Sink, Source} | |
import akka.stream.testkit.scaladsl.TestSink | |
import org.scalatest._ | |
import org.scalatest.Matchers._ | |
import org.scalatest.concurrent.ScalaFutures | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
class ValveSpec extends FlatSpec with ScalaFutures { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
implicit val executionContext = materializer.executionContext | |
"A closed valve" should "emit only " in { | |
val (valve, seq) = Source(1 to 3) | |
.viaMat(new Valve(ValveMode.Closed))(Keep.right) | |
.toMat(Sink.seq)(Keep.both) | |
.run() | |
after(100 millis, system.scheduler) { | |
Future.successful(valve.open) | |
} | |
whenReady(seq, timeout(200 millis)){ sum => sum should contain inOrder (1, 2, 3) } | |
} | |
"A closed valve" should "emit only 5 elements after it has been open" in { | |
val (valve, probe) = Source(1 to 5) | |
.viaMat(new Valve(ValveMode.Closed))(Keep.right) | |
.toMat(TestSink.probe[Int])(Keep.both) | |
.run() | |
probe.request(2) | |
probe.expectNoMsg(100 millis) | |
valve.open | |
probe.expectNext shouldEqual 1 | |
probe.expectNext shouldEqual 2 | |
probe.request(3) | |
probe.expectNext shouldEqual 3 | |
probe.expectNext shouldEqual 4 | |
probe.expectNext shouldEqual 5 | |
probe.expectComplete() | |
} | |
it should "emit 5 elements after it has been open/close/open" in { | |
val (valve, probe) = Source(1 to 5) | |
.viaMat(new Valve())(Keep.right) | |
.toMat(TestSink.probe[Int])(Keep.both) | |
.run() | |
probe.request(2) | |
probe.expectNext() shouldEqual 1 | |
probe.expectNext() shouldEqual 2 | |
valve.close | |
probe.request(1) | |
probe.expectNoMsg(100 millis) | |
valve.open | |
probe.expectNext() shouldEqual 3 | |
probe.request(2) | |
probe.expectNext() shouldEqual 4 | |
probe.expectNext() shouldEqual 5 | |
probe.expectComplete() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment