Last active
July 21, 2021 13:18
-
-
Save Danthar/d109e10619331b80f3352a6568a6e8ee to your computer and use it in GitHub Desktop.
Unlimited Parrallel processing flows example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// a=>process1 =>ack | |
/// / \ | |
/// / \ | |
/// a=>(a)=>BROADCAST - a=>process2 =>ack ZIP=>transform(a, completionSignal) | |
/// \ / | |
/// \ / | |
/// a=>process3 => ack | |
public static Flow<I, O, NotUsed> ParralelProcessingStream<I, O>(params Flow<I, O, NotUsed>[] processingFlows) | |
{ | |
return Flow.FromGraph(GraphDsl.Create(builder => | |
{ | |
var maxFlows = processingFlows.Length; | |
var broadcast = builder.Add(new Broadcast<I>(maxFlows)); | |
var zip = builder.Add(new ZipWithN<O, O>(result => result.First(), maxFlows)); | |
for (var i = 0; i < maxFlows; i++) | |
//connect a broadcast output to the input of the processing flow, and the output of the flow to an input of the zipstage | |
builder.From(broadcast.Out(i)).Via(builder.Add(processingFlows[i])).To(zip.In(i)); | |
//return the input of the broadcast element, and the output of the zip stage | |
return new FlowShape<I, O>(broadcast.In, zip.Out); | |
})); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment