Created
September 14, 2018 15:21
-
-
Save dembol/b69d205ca35af7ec19453e66affbb10c to your computer and use it in GitHub Desktop.
Akka grouped stage using a List
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream._ | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} | |
import scala.collection.immutable | |
final case class GroupedWithList[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { | |
require(n > 0, "n must be greater than 0") | |
val in = Inlet[T]("Grouped.in") | |
val out = Outlet[immutable.Seq[T]]("Grouped.out") | |
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { | |
private val buf = { | |
val b = List.newBuilder[T] | |
b.sizeHint(n) | |
b | |
} | |
var left = n | |
override def onPush(): Unit = { | |
buf += grab(in) | |
left -= 1 | |
if (left == 0) { | |
val elements = buf.result() | |
buf.clear() | |
left = n | |
push(out, elements) | |
} else { | |
pull(in) | |
} | |
} | |
override def onPull(): Unit = { | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
// This means the buf is filled with some elements but not enough (left < n) to group together. | |
// Since the upstream has finished we have to push them to downstream though. | |
if (left < n) { | |
val elements = buf.result() | |
buf.clear() | |
left = n | |
push(out, elements) | |
} | |
completeStage() | |
} | |
setHandlers(in, out, this) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment