Skip to content

Instantly share code, notes, and snippets.

@patriknw
Created June 13, 2018 15:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save patriknw/400f02cfdb5f03e66ca3c9c2f226390a to your computer and use it in GitHub Desktop.
Save patriknw/400f02cfdb5f03e66ca3c9c2f226390a to your computer and use it in GitHub Desktop.
Prototype LinearStage
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.stage
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Graph
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.impl.Buffer
object LinearStage {
def flow[In, Out, M](factory: () => LinearStageLogic[In, Out, M]): Graph[FlowShape[In, Out], M] =
new LinearGraphStageImpl(factory)
}
abstract class LinearStageLogic[In, Out, M](shape: FlowShape[In, Out])
extends TimerGraphStageLogic(shape)
with InHandler with OutHandler {
// it's ok to create a new FlowShape(in, Out) here because constructor in GraphStageLogic is only using
// the size of inlets and outlets. The actual in and out are set via `internalSetInOut`
def this() = this(FlowShape(Inlet[In]("in"), Outlet[Out]("out")))
private var in: Inlet[In] = _
private var out: Outlet[Out] = _
// Could use GraphStageLogic.emit instead of own buffer, but I think there should be a limit
private var outBuffer: Buffer[Out] = _
def internalSetInOut(i: Inlet[In], o: Outlet[Out]): Unit = {
in = i
out = o
}
/**
* Called when the input port has a new element available. The actual element can be retrieved via the
* [[GraphStageLogic.grab()]] method.
*/
@throws(classOf[Exception])
def onPush(elem: In): Unit
def outBufferSize: Int
def materializedValue: M
override def preStart(): Unit = {
outBuffer = Buffer(outBufferSize, materializer)
}
override def onPush(): Unit = {
onPush(grab(in))
if (isAvailable(out))
tryPull(in) // in case it was filtered, and not emitting anything
}
override def onPull(): Unit = {
if (outBuffer.isEmpty) {
if (isClosed(in))
completeStage()
else
tryPull(in)
} else
push(out, outBuffer.dequeue())
}
final def emit(elem: Out): Unit = {
if (outBuffer.isEmpty)
push(out, elem)
else
outBuffer.enqueue(elem)
// FIXME outBuffer.isFull
// FIXME if (isClosed(out))
}
final def emitMultiple(elems: immutable.Iterable[Out]): Unit = {
if (elems.nonEmpty) {
val iter = elems.iterator
if (outBuffer.isEmpty)
push(out, iter.next())
while (iter.hasNext)
outBuffer.enqueue(iter.next())
// FIXME outBuffer.isFull
// FIXME if (isClosed(out))
}
}
def isOutBufferFull: Boolean =
outBuffer.isFull
override def postStop(): Unit = {
outBuffer.clear()
super.postStop()
}
override def onUpstreamFinish(): Unit = {
if (outBuffer.isEmpty)
super.onUpstreamFinish()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
if (outBuffer.isEmpty)
super.onUpstreamFailure(ex)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class LinearGraphStageImpl[In, Out, M](factory: () => LinearStageLogic[In, Out, M])
extends GraphStageWithMaterializedValue[FlowShape[In, Out], M] {
override val shape = FlowShape(Inlet[In]("in"), Outlet[Out]("out"))
override def initialAttributes: Attributes = Attributes.name("LinearStage") // FIXME use DefaultAttributes
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) = {
val logic = factory()
logic.internalSetInOut(shape.in, shape.out)
(logic, logic.materializedValue)
}
}
object UsageDemo {
class Map[A, B](f: A ⇒ B) extends LinearStageLogic[A, B, NotUsed] {
override def onPush(elem: A): Unit = {
emit(f(elem))
}
override def outBufferSize: Int = 0
override def materializedValue: NotUsed = NotUsed
}
class Filter[A](p: A ⇒ Boolean) extends LinearStageLogic[A, A, NotUsed] {
override def onPush(elem: A): Unit = {
if (p(elem))
emit(elem)
}
override def outBufferSize: Int = 0
override def materializedValue: NotUsed = NotUsed
}
class Duplicator[A] extends LinearStageLogic[A, A, NotUsed] {
override def onPush(elem: A): Unit = {
emitMultiple(List(elem, elem))
}
override def outBufferSize: Int = 1
override def materializedValue: NotUsed = NotUsed
}
// each time an event is pushed through it will trigger a period of silence
class TimedGate[A](silencePeriod: FiniteDuration) extends LinearStageLogic[A, A, NotUsed] {
var open = false
override def onPush(elem: A): Unit = {
if (!open) {
emit(elem)
open = true
scheduleOnce(None, silencePeriod)
}
}
override def onTimer(timerKey: Any): Unit = {
open = false
}
override def materializedValue: NotUsed = NotUsed
override def outBufferSize: Int = 0
}
//https://akka.io/blog/2016/10/21/emit-and-friends
class Max extends LinearStageLogic[Int, Int, NotUsed] {
var maxValue = Int.MinValue
var maxPushed = Int.MinValue
override def onPush(elem: Int): Unit = {
maxValue = math.max(maxValue, elem)
if (maxValue > maxPushed) {
maxPushed = maxValue
emit(maxPushed)
}
}
override def materializedValue: NotUsed = NotUsed
override def outBufferSize: Int = 0
}
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
import akka.stream.Materializer
implicit val mat: Materializer = ???
val resultFuture = Source(1 to 5)
.via(LinearStage.flow(() => new Filter(_ % 2 == 0)))
.via(LinearStage.flow(() => new Duplicator()))
.via(LinearStage.flow(() => new Map(_ / 2)))
.runWith(Sink.ignore)
}
@2m
Copy link

2m commented Jun 19, 2018

Looking good! I would like to try and remove

    override def outBufferSize: Int = 1

    override def materializedValue: NotUsed = NotUsed

from the user facing API as well.

Also a default consctructor for LinearStageLogic that requires a stage name would be great to enforce adding a name to every custom stage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment