Created
October 3, 2018 23:12
-
-
Save Aaronontheweb/15fd90ff517027c6e4d7c00e4a87fc80 to your computer and use it in GitHub Desktop.
Akka.NET Issue 3597
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Collections.Generic; | |
using System.Linq; | |
using Akka.Actor; | |
using Akka.Event; | |
using Akka.Routing; | |
namespace AkkaTest | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var actorSystem = ActorSystem.Create("ActorSystem"); | |
IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator())); | |
for (int i = 0; i < 1000; i++) | |
{ | |
ChildActor.ProcessData processData = new ChildActor.ProcessData(i); | |
coordinator.Tell(processData); | |
} | |
coordinator.Tell(new Coordinator.DisposeAll()); | |
Console.ReadLine(); | |
} | |
} | |
public class Coordinator : ReceiveActor | |
{ | |
public class DisposeAll | |
{ | |
} | |
private readonly ILoggingAdapter _logger = Context.GetLogger(); | |
private readonly HashSet<IActorRef> _actors = new HashSet<IActorRef>(); | |
private IActorRef _consumer; | |
public Coordinator() | |
{ | |
Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); }); | |
Receive<int>(x => { | |
_actors.Add(Sender); | |
_logger.Info($"Tracking [{_actors.Count}] routees"); | |
}); | |
Receive<DisposeAll>(x => { _consumer.Tell(x); }); | |
} | |
protected override void PreStart() | |
{ | |
if (Context.Child("Consumer").Equals(ActorRefs.Nobody)) | |
{ | |
_consumer = Context.ActorOf( | |
Props.Create(() => new Consumer()) | |
, "Consumer"); | |
} | |
} | |
protected override SupervisorStrategy SupervisorStrategy() | |
{ | |
return new OneForOneStrategy(ex => | |
{ | |
if (ex is InvalidDataException) | |
{ | |
_logger.Info($"Resuming {Sender}"); | |
return Directive.Resume; | |
} | |
return Directive.Stop; | |
}); | |
} | |
} | |
public class Consumer : ReceiveActor | |
{ | |
private readonly ILoggingAdapter _logger = Context.GetLogger(); | |
private IActorRef _childRouter; | |
private int _progress; | |
public Consumer() | |
{ | |
Receive<ChildActor.ProcessData>(x => | |
{ | |
_progress++; | |
if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress); | |
_childRouter.Forward(x); | |
}); | |
Receive<Terminated>(x => | |
{ | |
_logger.Error("Child Router terminated."); | |
}); | |
Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); }); | |
ReceiveAsync<ReceiveTimeout>(async t => { | |
var routeeCount = (await _childRouter.Ask<Routees>(new GetRoutees(), TimeSpan.FromSeconds(3))).Members.Count(); | |
_logger.Info("Router has {0} remaining routees", routeeCount); | |
}); | |
} | |
protected override void PreStart() | |
{ | |
if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody)) | |
{ | |
_childRouter = | |
Context.ActorOf( | |
Props.Create(() => new ChildActor()) | |
.WithRouter(new RoundRobinPool(100)) | |
.WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter"); | |
Context.Watch(_childRouter); | |
} | |
Context.SetReceiveTimeout(TimeSpan.FromSeconds(4)); | |
} | |
protected override SupervisorStrategy SupervisorStrategy() | |
{ | |
return new OneForOneStrategy(ex => Directive.Escalate); | |
} | |
} | |
public class ChildActor : ReceiveActor | |
{ | |
public class ProcessData | |
{ | |
public int Data { get; private set; } | |
public ProcessData(int data) | |
{ | |
Data = data; | |
} | |
} | |
private readonly ILoggingAdapter _logger = Context.GetLogger(); | |
public ChildActor() | |
{ | |
Receive<ProcessData>(x => | |
{ | |
Sender.Tell(1); | |
if (x.Data % 5 == 0) | |
{ | |
_logger.Info("{0} is Divisible by 5", x.Data); | |
} | |
else | |
{ | |
//if this line is commented, router terminates just fine | |
throw new InvalidDataException("Error while processing."); | |
} | |
}); | |
} | |
protected override void PostStop(){ | |
_logger.Info("Terminated"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment