Skip to content

Instantly share code, notes, and snippets.

@Danthar
Last active August 29, 2015 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Danthar/5e2812d3d6ca31129c01 to your computer and use it in GitHub Desktop.
Save Danthar/5e2812d3d6ca31129c01 to your computer and use it in GitHub Desktop.
Router supervisionstrategies
using System;
using Akka.Actor;
using Akka.Routing;
namespace Akka.RouterStrategies
{
class Program
{
static void Main(string[] args) {
var system = ActorSystem.Create("failboat");
var actor = system.ActorOf(Props.Create(() => new Importer()),"start");
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell(5); //this will fail
actor.Tell("This message will work");
system.AwaitTermination();
}
}
class Importer : UntypedActor {
private IActorRef forwarder;
private IActorRef reporter;
private int failed;
protected override void PreStart() {
forwarder = Context.ActorOf(Props.Create(() => new IntermediateActor()),"intermediate");
Context.Watch(forwarder);
reporter = Context.ActorOf(Props.Create(() => new Reporter()), "reporter");
}
protected override void OnReceive(object message) {
if (message is Terminated) {
Console.WriteLine("Shutting down");
Context.System.Shutdown();
}
else {
forwarder.Tell(message, reporter);
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex =>
{
if (ex is ArgumentException)
{
Console.WriteLine("Unrecoverable database error - stopping");
}
return Directive.Stop;
});
}
}
class IntermediateActor : ReceiveActor
{
private IActorRef childRef;
public IntermediateActor() {
Receive<object>(x => {
childRef.Forward(x);
});
}
protected override void PreStart()
{
childRef = Context.ActorOf(Props.Create<worker>()
.WithRouter(
new RoundRobinPool(5)
.WithSupervisorStrategy(routerStrategy())
)
, "worker");
}
private SupervisorStrategy routerStrategy() {
return new AllForOneStrategy(5, TimeSpan.FromSeconds(10), (ex) =>
{
return Directive.Escalate; //<-- i expect this to be hit. But it never is.
});
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new AllForOneStrategy(5, TimeSpan.FromSeconds(10), (ex) =>
{
return Directive.Escalate; //<- when the router fails, since this is its parent, i expect this to hit as well.
});
}
}
class Reporter : ReceiveActor
{
public Reporter() {
Receive<object>(m => {
Console.WriteLine("reporter received message");
});
}
}
class worker : ReceiveActor {
public worker() {
Receive<object>(message => {
Console.WriteLine("received worker: " + message);
if (message is int)
{
throw new ArgumentException("retry error: " + message);
}
Sender.Tell(message);
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment