Created
January 16, 2016 15:11
-
-
Save l15k4/922fbf4d65f87b1aa49b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream._ | |
import akka.stream.scaladsl.FlexiMerge | |
import akka.stream.scaladsl.FlexiMerge.{Read, ReadAll, MergeLogic} | |
import scala.collection.{immutable, mutable} | |
class ParallelMergeLogic[A](remainingInlets: mutable.Set[Inlet[A]]) extends MergeLogic[immutable.Seq[A]] { | |
override def initialState = State(ReadAll(remainingInlets.toIndexedSeq)) { | |
case (ctx, input, inputs) => | |
ctx.emit(remainingInlets.toIndexedSeq.map(inputs(_))) | |
SameState | |
} | |
override def initialCompletionHandling = CompletionHandling ( | |
onUpstreamFinish = { (ctx, input) => | |
remainingInlets.remove(input.asInstanceOf[Inlet[A]]) | |
if (remainingInlets.size == 1) { | |
ctx.changeCompletionHandling(defaultCompletionHandling) | |
readLast(remainingInlets.head) | |
} else { | |
readTail(remainingInlets.toIndexedSeq) | |
} | |
}, | |
onUpstreamFailure = { (ctx, _, cause) => | |
ctx.fail(cause) | |
SameState | |
} | |
) | |
private def readTail(inlets: immutable.Seq[Inlet[A]]) = State(ReadAll(inlets)) { (ctx, input, inputs) => | |
ctx.emit(inlets.map(inputs(_)).toIndexedSeq) | |
SameState | |
} | |
private def readLast(inlet: Inlet[A]) = State(Read(inlet)) { (ctx, input, element) => | |
ctx.emit(IndexedSeq(element).toIndexedSeq) | |
SameState | |
} | |
} | |
class Parallel2Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape2[A, A, immutable.Seq[A]]](new FanInShape2("Parallel2Merge"), Attributes.name("Parallel2Merge")) { | |
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1)) | |
} | |
class Parallel3Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape3[A, A, A, immutable.Seq[A]]](new FanInShape3("Parallel3Merge"), Attributes.name("Parallel3Merge")) { | |
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2)) | |
} | |
class Parallel4Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape4[A, A, A, A, immutable.Seq[A]]](new FanInShape4("Parallel4Merge"), Attributes.name("Parallel4Merge")) { | |
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2, p.in3)) | |
} | |
class Parallel5Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape5[A, A, A, A, A, immutable.Seq[A]]](new FanInShape5("Parallel5Merge"), Attributes.name("Parallel5Merge")) { | |
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2, p.in3, p.in4)) | |
} | |
class Parallel6Merge[A] extends FlexiMerge[immutable.Seq[A], FanInShape6[A, A, A, A, A, A, immutable.Seq[A]]](new FanInShape6("Parallel6Merge"), Attributes.name("Parallel6Merge")) { | |
def createMergeLogic(p: PortT) = new ParallelMergeLogic[A](mutable.Set[Inlet[A]](p.in0, p.in1, p.in2, p.in3, p.in4, p.in5)) | |
} | |
object ParallelMerge { | |
def apply[T](inputSize: Int) = { | |
inputSize match { | |
case 2 => new Parallel2Merge[T] | |
case 3 => new Parallel3Merge[T] | |
case 4 => new Parallel4Merge[T] | |
case 5 => new Parallel5Merge[T] | |
case 6 => new Parallel6Merge[T] | |
case _ => throw new IllegalArgumentException("ParallelMerge can only be used for merging 2 - 6 inputs inclusive !!") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment