Skip to content

Instantly share code, notes, and snippets.

@regis-leray
Last active October 14, 2016 13:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.
Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.
Valve for Akka stream
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
trait ValveSwitch {
def open: Unit
def close: Unit
}
class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
}
private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape) with InHandler with OutHandler{
import shape._
var bufferedElement = Option.empty[A]
val switch = new ValveSwitch {
val callback = getAsyncCallback[A](push(out, _))
override def open: Unit = {
mode = ValveMode.Open
bufferedElement.foreach(callback.invoke)
bufferedElement = Option.empty
}
override def close: Unit = {
mode = ValveMode.Closed
}
}
setHandlers(in,out, this)
override def onPush(): Unit = {
val element = grab(in) //acquires the element that has been received during an onPush
if (mode == ValveMode.Open) {
push(out, element) //push directly the element on the out port
} else {
bufferedElement = Some(element)
}
}
override def onPull(): Unit = pull(in) //request the next element on in port
}
}
trait ValveMode
object ValveMode {
case object Open extends ValveMode
case object Closed extends ValveMode
}
import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest._
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
import scala.concurrent.duration._
class ValveSpec extends FlatSpec with ScalaFutures {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = materializer.executionContext
"A closed valve" should "emit only " in {
val (valve, seq) = Source(1 to 3)
.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
after(100 millis, system.scheduler) {
Future.successful(valve.open)
}
whenReady(seq, timeout(200 millis)){ sum => sum should contain inOrder (1, 2, 3) }
}
"A closed valve" should "emit only 5 elements after it has been open" in {
val (valve, probe) = Source(1 to 5)
.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
probe.request(2)
probe.expectNoMsg(100 millis)
valve.open
probe.expectNext shouldEqual 1
probe.expectNext shouldEqual 2
probe.request(3)
probe.expectNext shouldEqual 3
probe.expectNext shouldEqual 4
probe.expectNext shouldEqual 5
probe.expectComplete()
}
it should "emit 5 elements after it has been open/close/open" in {
val (valve, probe) = Source(1 to 5)
.viaMat(new Valve())(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
probe.request(2)
probe.expectNext() shouldEqual 1
probe.expectNext() shouldEqual 2
valve.close
probe.request(1)
probe.expectNoMsg(100 millis)
valve.open
probe.expectNext() shouldEqual 3
probe.request(2)
probe.expectNext() shouldEqual 4
probe.expectNext() shouldEqual 5
probe.expectComplete()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment