Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import akka.stream._
import akka.stream.scaladsl.FlexiMerge
import akka.stream.scaladsl.FlexiMerge.{Read, ReadAll, MergeLogic}
import scala.collection.{immutable, mutable}
class ParallelMergeLogic[A](remainingInlets: mutable.Set[Inlet[A]]) extends MergeLogic[immutable.Seq[A]] {
override def initialState = State(ReadAll(remainingInlets.toIndexedSeq)) {
case (ctx, input, inputs) =>
ctx.emit(remainingInlets.toIndexedSeq.map(inputs(_)))
SameState
}
override def initialCompletionHandling = CompletionHandling (
onUpstreamFinish = { (ctx, input) =>
remainingInlets.remove(input.asInstanceOf[Inlet[A]])
if (remainingInlets.size == 1) {
ctx.changeCompletionHandling(defaultCompletionHandling)
readLast(remainingInlets.head)
} else {
readTail(remainingInlets.toIndexedSeq)
}
},
onUpstreamFailure = { (ctx, _, cause) =>
ctx.fail(cause)
SameState
}
)
private def readTail(inlets: immutable.Seq[Inlet[A]]) = State(ReadAll(inlets)) { (ctx, input, inputs) =>
ctx.emit(inlets.map(inputs(_)).toIndexedSeq)
SameState
}
private def readLast(inlet: Inlet[A]) = State(Read(inlet)) { (ctx, input, element) =>
ctx.emit(IndexedSeq(element).toIndexedSeq)
SameState
}
}
class Parallel2Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape2[A, A, immutable.Seq[A]]](new FanInShape2("Parallel2Merge"), Attributes.name("Parallel2Merge")) {
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1))
}
class Parallel3Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape3[A, A, A, immutable.Seq[A]]](new FanInShape3("Parallel3Merge"), Attributes.name("Parallel3Merge")) {
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2))
}
class Parallel4Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape4[A, A, A, A, immutable.Seq[A]]](new FanInShape4("Parallel4Merge"), Attributes.name("Parallel4Merge")) {
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2, p.in3))
}
class Parallel5Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape5[A, A, A, A, A, immutable.Seq[A]]](new FanInShape5("Parallel5Merge"), Attributes.name("Parallel5Merge")) {
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2, p.in3, p.in4))
}
class Parallel6Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape6[A, A, A, A, A, A, immutable.Seq[A]]](new FanInShape6("Parallel6Merge"), Attributes.name("Parallel6Merge")) {
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2, p.in3, p.in4, p.in5))
}
object ParallelMerge {
def apply[T](inputSize: Int) = {
inputSize match {
case 2 => new Parallel2Merge[T]
case 3 => new Parallel3Merge[T]
case 4 => new Parallel4Merge[T]
case 5 => new Parallel5Merge[T]
case 6 => new Parallel6Merge[T]
case _ => throw new IllegalArgumentException("ParallelMerge can only be used for merging 2 - 6 inputs inclusive !!")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment