Skip to content

Instantly share code, notes, and snippets.

@Danthar
Last active July 21, 2021 13:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Danthar/d109e10619331b80f3352a6568a6e8ee to your computer and use it in GitHub Desktop.
Save Danthar/d109e10619331b80f3352a6568a6e8ee to your computer and use it in GitHub Desktop.
Unlimited Parrallel processing flows example
/// a=>process1 =>ack
/// / \
/// / \
/// a=>(a)=>BROADCAST - a=>process2 =>ack ZIP=>transform(a, completionSignal)
/// \ /
/// \ /
/// a=>process3 => ack
public static Flow<I, O, NotUsed> ParralelProcessingStream<I, O>(params Flow<I, O, NotUsed>[] processingFlows)
{
return Flow.FromGraph(GraphDsl.Create(builder =>
{
var maxFlows = processingFlows.Length;
var broadcast = builder.Add(new Broadcast<I>(maxFlows));
var zip = builder.Add(new ZipWithN<O, O>(result => result.First(), maxFlows));
for (var i = 0; i < maxFlows; i++)
//connect a broadcast output to the input of the processing flow, and the output of the flow to an input of the zipstage
builder.From(broadcast.Out(i)).Via(builder.Add(processingFlows[i])).To(zip.In(i));
//return the input of the broadcast element, and the output of the zip stage
return new FlowShape<I, O>(broadcast.In, zip.Out);
}));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment