Skip to content

Instantly share code, notes, and snippets.

@dembol
Created September 14, 2018 15:21
Show Gist options
  • Save dembol/b69d205ca35af7ec19453e66affbb10c to your computer and use it in GitHub Desktop.
Save dembol/b69d205ca35af7ec19453e66affbb10c to your computer and use it in GitHub Desktop.
Akka grouped stage using a List
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable
final case class GroupedWithList[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
val in = Inlet[T]("Grouped.in")
val out = Outlet[immutable.Seq[T]]("Grouped.out")
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val buf = {
val b = List.newBuilder[T]
b.sizeHint(n)
b
}
var left = n
override def onPush(): Unit = {
buf += grab(in)
left -= 1
if (left == 0) {
val elements = buf.result()
buf.clear()
left = n
push(out, elements)
} else {
pull(in)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
// This means the buf is filled with some elements but not enough (left < n) to group together.
// Since the upstream has finished we have to push them to downstream though.
if (left < n) {
val elements = buf.result()
buf.clear()
left = n
push(out, elements)
}
completeStage()
}
setHandlers(in, out, this)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment