Skip to content

Instantly share code, notes, and snippets.

@Aaronontheweb
Created October 3, 2018 23:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aaronontheweb/15fd90ff517027c6e4d7c00e4a87fc80 to your computer and use it in GitHub Desktop.
Save Aaronontheweb/15fd90ff517027c6e4d7c00e4a87fc80 to your computer and use it in GitHub Desktop.
Akka.NET Issue 3597
using System;
using System.IO;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Event;
using Akka.Routing;
namespace AkkaTest
{
class Program
{
static void Main(string[] args)
{
var actorSystem = ActorSystem.Create("ActorSystem");
IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator()));
for (int i = 0; i < 1000; i++)
{
ChildActor.ProcessData processData = new ChildActor.ProcessData(i);
coordinator.Tell(processData);
}
coordinator.Tell(new Coordinator.DisposeAll());
Console.ReadLine();
}
}
public class Coordinator : ReceiveActor
{
public class DisposeAll
{
}
private readonly ILoggingAdapter _logger = Context.GetLogger();
private readonly HashSet<IActorRef> _actors = new HashSet<IActorRef>();
private IActorRef _consumer;
public Coordinator()
{
Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); });
Receive<int>(x => {
_actors.Add(Sender);
_logger.Info($"Tracking [{_actors.Count}] routees");
});
Receive<DisposeAll>(x => { _consumer.Tell(x); });
}
protected override void PreStart()
{
if (Context.Child("Consumer").Equals(ActorRefs.Nobody))
{
_consumer = Context.ActorOf(
Props.Create(() => new Consumer())
, "Consumer");
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex =>
{
if (ex is InvalidDataException)
{
_logger.Info($"Resuming {Sender}");
return Directive.Resume;
}
return Directive.Stop;
});
}
}
public class Consumer : ReceiveActor
{
private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _childRouter;
private int _progress;
public Consumer()
{
Receive<ChildActor.ProcessData>(x =>
{
_progress++;
if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress);
_childRouter.Forward(x);
});
Receive<Terminated>(x =>
{
_logger.Error("Child Router terminated.");
});
Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); });
ReceiveAsync<ReceiveTimeout>(async t => {
var routeeCount = (await _childRouter.Ask<Routees>(new GetRoutees(), TimeSpan.FromSeconds(3))).Members.Count();
_logger.Info("Router has {0} remaining routees", routeeCount);
});
}
protected override void PreStart()
{
if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody))
{
_childRouter =
Context.ActorOf(
Props.Create(() => new ChildActor())
.WithRouter(new RoundRobinPool(100))
.WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter");
Context.Watch(_childRouter);
}
Context.SetReceiveTimeout(TimeSpan.FromSeconds(4));
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex => Directive.Escalate);
}
}
public class ChildActor : ReceiveActor
{
public class ProcessData
{
public int Data { get; private set; }
public ProcessData(int data)
{
Data = data;
}
}
private readonly ILoggingAdapter _logger = Context.GetLogger();
public ChildActor()
{
Receive<ProcessData>(x =>
{
Sender.Tell(1);
if (x.Data % 5 == 0)
{
_logger.Info("{0} is Divisible by 5", x.Data);
}
else
{
//if this line is commented, router terminates just fine
throw new InvalidDataException("Error while processing.");
}
});
}
protected override void PostStop(){
_logger.Info("Terminated");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment