Skip to content

Instantly share code, notes, and snippets.

Created October 3, 2018 23:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aaronontheweb/15fd90ff517027c6e4d7c00e4a87fc80 to your computer and use it in GitHub Desktop.
Save Aaronontheweb/15fd90ff517027c6e4d7c00e4a87fc80 to your computer and use it in GitHub Desktop.
Akka.NET Issue 3597
using System;
using System.IO;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Event;
using Akka.Routing;
namespace AkkaTest
class Program
static void Main(string[] args)
var actorSystem = ActorSystem.Create("ActorSystem");
IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator()));
for (int i = 0; i < 1000; i++)
ChildActor.ProcessData processData = new ChildActor.ProcessData(i);
coordinator.Tell(new Coordinator.DisposeAll());
public class Coordinator : ReceiveActor
public class DisposeAll
private readonly ILoggingAdapter _logger = Context.GetLogger();
private readonly HashSet<IActorRef> _actors = new HashSet<IActorRef>();
private IActorRef _consumer;
public Coordinator()
Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); });
Receive<int>(x => {
_logger.Info($"Tracking [{_actors.Count}] routees");
Receive<DisposeAll>(x => { _consumer.Tell(x); });
protected override void PreStart()
if (Context.Child("Consumer").Equals(ActorRefs.Nobody))
_consumer = Context.ActorOf(
Props.Create(() => new Consumer())
, "Consumer");
protected override SupervisorStrategy SupervisorStrategy()
return new OneForOneStrategy(ex =>
if (ex is InvalidDataException)
_logger.Info($"Resuming {Sender}");
return Directive.Resume;
return Directive.Stop;
public class Consumer : ReceiveActor
private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _childRouter;
private int _progress;
public Consumer()
Receive<ChildActor.ProcessData>(x =>
if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress);
Receive<Terminated>(x =>
_logger.Error("Child Router terminated.");
Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); });
ReceiveAsync<ReceiveTimeout>(async t => {
var routeeCount = (await _childRouter.Ask<Routees>(new GetRoutees(), TimeSpan.FromSeconds(3))).Members.Count();
_logger.Info("Router has {0} remaining routees", routeeCount);
protected override void PreStart()
if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody))
_childRouter =
Props.Create(() => new ChildActor())
.WithRouter(new RoundRobinPool(100))
.WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter");
protected override SupervisorStrategy SupervisorStrategy()
return new OneForOneStrategy(ex => Directive.Escalate);
public class ChildActor : ReceiveActor
public class ProcessData
public int Data { get; private set; }
public ProcessData(int data)
Data = data;
private readonly ILoggingAdapter _logger = Context.GetLogger();
public ChildActor()
Receive<ProcessData>(x =>
if (x.Data % 5 == 0)
_logger.Info("{0} is Divisible by 5", x.Data);
//if this line is commented, router terminates just fine
throw new InvalidDataException("Error while processing.");
protected override void PostStop(){
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment