Skip to content

Instantly share code, notes, and snippets.

@Danthar
Last active July 20, 2021 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Danthar/94054550f42f287a3a0be78f142526bf to your computer and use it in GitHub Desktop.
Save Danthar/94054550f42f287a3a0be78f142526bf to your computer and use it in GitHub Desktop.
c# PassThroughFlow example Akka.Streams
/// <summary>
/// As translated from: https://doc.akka.io/docs/alpakka/current/patterns.html#passthrough
/// </summary>
public class PassThroughFlow
{
public static IGraph<FlowShape<A, (T,A)>, NotUsed> create<A,T>(Flow<A, T, NotUsed> flow) {
return create(flow, Keep.Both);
}
/// <summary>
/// a=>transform=>a1
/// / \
/// / \
/// a=>(a, a)=>unzip - zip=>(a1, a)=> a
/// \ /
/// \ /
/// --------a--------
/// </summary>
/// <param name="flow"></param>
/// <param name="output"></param>
/// <typeparam name="A"></typeparam>
/// <typeparam name="T"></typeparam>
/// <typeparam name="O"></typeparam>
/// <returns></returns>
private static IGraph<FlowShape<A, O>, NotUsed> create<A, T, O>(
Flow<A, T, NotUsed> flow, Func<T, A, O> output) {
return Flow.FromGraph(
GraphDsl.Create(builder => {
UniformFanOutShape<A, A> broadcast = builder.Add(new Broadcast<A>(2));
FanInShape<T, A, O> zip = builder.Add(new ZipWith<T,A, O>(output));
builder.From(broadcast.Out(0)).Via(builder.Add(flow)).To(zip.In0);
builder.From(broadcast.Out(1)).To(zip.In1);
return new FlowShape<A,O>(broadcast.In, zip.Out);
}));
}
public static async Task Example()
{
using var system = ActorSystem.Create("passtroughflow");
using var materialiser = system.Materializer();
{
Source<int,NotUsed> source = Source.From(new []{1,2,3});
// Pass through this flow maintaining the original message
Flow<int, int, NotUsed> passThroughMe = Flow.Create<int>().Select(i => i * 10);
IImmutableList<(int, int)> ret = await source.Via(PassThroughFlow.create(passThroughMe)).RunWith(Sink.Seq<(int,int)>(),materialiser);
IImmutableList<(int, int)> list = ret;
list[0].Equals((10,1));
list[0].Equals((20,2));
list[0].Equals((30,3));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment