Skip to content

Instantly share code, notes, and snippets.

@Aaronontheweb
Created February 12, 2021 16:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aaronontheweb/3178c4f38130d245ed03ceab9b5355a8 to your computer and use it in GitHub Desktop.
Save Aaronontheweb/3178c4f38130d245ed03ceab9b5355a8 to your computer and use it in GitHub Desktop.
Balanced Akka.Streams Consumption with MergeHub
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Dsl;
namespace Akka.Streams.Recipies
{
class Program
{
static async Task Main(string[] args)
{
var actorSytem = ActorSystem.Create("akkaStreamsTest");
var materializer = actorSytem.Materializer();
var setup1 = new StreamsSetup();
var sink = Sink.ForEach<string>(str => Console.WriteLine(str));
var graph = MergeHub.Source<string>()
.Via(Flow.Create<string>().SelectAsync(5, async x =>
{
await Task.Delay(250); // simulate some latency
return x;
}))
.To(sink);
// start running the merge hub
var runnableSink = graph.Run(materializer);
// attach busy producers first
setup1.Noisy1000.RunWith(runnableSink, materializer);
setup1.Noisy500.RunWith(runnableSink, materializer);
await Task.Delay(250);
// should attach "quieter" producers and still see their output appear
// without having to wait for Noisy1000 and Noisy500 to complete
setup1.Normal10.RunWith(runnableSink, materializer);
setup1.Normal100.RunWith(runnableSink, materializer);
await Task.Delay(250);
// attach our quiet, infrequently used source at the end
setup1.Quiet1.RunWith(runnableSink, materializer);
await actorSytem.WhenTerminated;
}
}
public sealed class StreamsSetup
{
public Source<string, NotUsed> Noisy1000 { get; }
public Source<string, NotUsed> Noisy500 { get; }
public Source<string, NotUsed> Normal100 { get; }
public Source<string, NotUsed> Normal10 { get; }
public Source<string, NotUsed> Quiet1 { get; }
public StreamsSetup()
{
Noisy1000 = Source.From(Enumerable.Range(0, 1000).Select(x => $"noisy1000-{x}"));
Noisy500 = Source.From(Enumerable.Range(0, 500).Select(x => $"noisy500-{x}"));
Normal100 = Source.From(Enumerable.Range(0, 100).Select(x => $"normal100-{x}"));
Normal10 = Source.From(Enumerable.Range(0, 10).Select(x => $"normal10-{x}"));
Quiet1 = Source.From(Enumerable.Range(0, 1).Select(x => $"quiet1-{x}"));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment