Skip to content

Instantly share code, notes, and snippets.

@bwaterschoot
Created October 5, 2016 11:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bwaterschoot/fcafc601941a9e55d7bf687ee97dc26a to your computer and use it in GitHub Desktop.
Save bwaterschoot/fcafc601941a9e55d7bf687ee97dc26a to your computer and use it in GitHub Desktop.
Unhandled message from supervisor
...
DEBUG Actors.Worker - Started
DEBUG Actors.Worker - Started
DEBUG Actors.Worker - Started
DEBUG Actors.Worker - Started
DEBUG Actors.Worker - now watched by [akka://MessagePipelineSystem/user/Supervisor_emailsender]
DEBUG Actors.Worker - now watched by [akka://MessagePipelineSystem/user/Supervisor_emailsender]
DEBUG Actors.Worker - now watched by [akka://MessagePipelineSystem/user/Supervisor_emailsender]
DEBUG Actors.Worker - now watched by [akka://MessagePipelineSystem/user/Supervisor_emailsender]
DEBUG Akka.Actor.LocalActorRef - Unhandled message from akka://MessagePipelineSystem/user/Supervisor_emailsender : Messages.StartProcessing
DEBUG Akka.Actor.LocalActorRef - Unhandled message from akka://MessagePipelineSystem/user/Supervisor_emailsender : Messages.StartProcessing
DEBUG Akka.Actor.LocalActorRef - Unhandled message from akka://MessagePipelineSystem/user/Supervisor_emailsender : Messages.StartProcessing
DEBUG Akka.Actor.LocalActorRef - Unhandled message from akka://MessagePipelineSystem/user/Supervisor_emailsender : Messages.StartProcessing
...
internal class Supervisor : ReceiveActor, IWithUnboundedStash
{
private readonly ILoggingAdapter _log;
private readonly IActorRef _messageDispatcher;
private readonly IActorRef _messageSink;
private readonly IProgressMonitor _progressMonitor;
private readonly ISet<IActorRef> _registeredWorkers;
private MessagePipelineStatus _status;
public Supervisor(IProgressMonitor progressMonitor, IActorRef messageDispatcher, IActorRef messageSink)
{
_progressMonitor = progressMonitor;
_messageDispatcher = messageDispatcher;
_messageSink = messageSink;
_registeredWorkers = new HashSet<IActorRef>();
_log = Context.GetLogger();
}
protected override void PreStart()
{
Become(Stopped);
}
private void Stopped()
{
_status = MessagePipelineStatus.Paused;
OnRegisterWorker();
OnScaleDown();
OnTermination();
Receive<RequestStatus>(m => Sender.Tell(_status));
Receive<StartProcessing>(m =>
{
_messageDispatcher.Tell(m);
BroadcastToWorkers(m);
Become(Running);
});
Receive<AvailableForWork>(m => Stash.Stash());
Receive<StopProcessing>(_ => _log.Info($"Calling stop on an already stopped supervisor '{Self}'"));
}
private void Running()
{
_status = MessagePipelineStatus.Started;
Stash.UnstashAll();
Receive<RequestStatus>(m => Sender.Tell(_status));
Receive<StopProcessing>(m =>
{
_messageDispatcher.Tell(m);
BroadcastToWorkers(m);
Become(Stopped);
});
OnRegisterWorker();
OnScaleDown();
OnTermination();
Receive<AvailableForWork>(m => _messageDispatcher.Tell(new RequestMessage(), m.WorkerRef));
Receive<MessageHandlingFailed>(m =>
{
_progressMonitor.RecordProcessingTime(m.Envelope, m.TimeSpentProcessing, false);
_messageSink.Forward(m);
});
Receive<MessageHandlingSuccessful>(m =>
{
_progressMonitor.RecordProcessingTime(m.Envelope, m.TimeSpentProcessing, true);
_messageSink.Forward(m);
});
Receive<Terminated>(t =>
{
if (_registeredWorkers.Contains(t.ActorRef))
{
_log.Error($"Worker '{t.ActorRef}' died, removing it from available workers.");
_registeredWorkers.Remove(t.ActorRef);
}
});
Receive<StartProcessing>(_ => _log.Info($"Calling start on an already started supervisor '{Self}'"));
}
private void OnRegisterWorker()
{
Receive<RegisterWorker>(m =>
{
_registeredWorkers.Add(m.WorkerRef);
Context.Watch(m.WorkerRef);
});
}
private void OnScaleDown()
{
Receive<ScaleDownBy>(m =>
{
if (_registeredWorkers.Count <= m.NumberOfWorkers)
{
_log.Warning($"Tried to scale down to {m.NumberOfWorkers} but only {_registeredWorkers.Count} are available. Ignored scale down request.");
return;
}
for (var i = 0; i < m.NumberOfWorkers; i++)
{
var worker = _registeredWorkers.ElementAt(i);
worker.Tell(PoisonPill.Instance);
_registeredWorkers.Remove(worker);
}
_log.Info(
$"Scaling down workers by {m.NumberOfWorkers}. Number of workers remaining: {_registeredWorkers.Count}");
});
}
private void OnTermination()
{
Receive<Terminated>(t =>
{
if (_registeredWorkers.Contains(t.ActorRef))
{
_log.Error($"Worker '{t.ActorRef}' died, removing it from available actors.");
_registeredWorkers.Remove(t.ActorRef);
}
});
}
private void BroadcastToWorkers<T>(T m) where T : class
{
foreach (var registeredWorker in _registeredWorkers)
{
registeredWorker.Tell(m);
}
}
public IStash Stash { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment