Skip to content

Instantly share code, notes, and snippets.

@bwaterschoot
Created June 20, 2016 11:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bwaterschoot/a2401f23f075c4ed3b891437832b6b53 to your computer and use it in GitHub Desktop.
Save bwaterschoot/a2401f23f075c4ed3b891437832b6b53 to your computer and use it in GitHub Desktop.
Watcher doesn't receive Terminated message
public class Supervisor : ReceiveActor, IWithUnboundedStash
{
private readonly ISet<IActorRef> _registeredWorkers;
private readonly ILoggingAdapter _log;
public Supervisor()
{
_registeredWorkers = new HashSet<IActorRef>();
_log = Context.GetLogger();
}
protected override void PreStart()
{
Become(Stopped);
}
private void Stopped()
{
Receive<ScaleDownBy>(m =>
{
for (var i = 0; i < m.NumberOfWorkers; i++)
{
var worker = _registeredWorkers.ElementAt(i);
worker.Tell(PoisonPill.Instance);
_registeredWorkers.Remove(worker);
}
_log.Info(
$"Scaling down workers by {m.NumberOfWorkers}. Number of workers remaining: {_registeredWorkers.Count}");
});
Receive<RegisterWorker>(m => _registeredWorkers.Add(m.WorkerRef));
Receive<AvailableForWork>(m => Stash.Stash());
}
public IStash Stash { get; set; }
}
[TestFixture]
public class SupervisorTests : TestKit
{
[Test]
public void When_supervisor_is_asked_to_scale_down_it_should_terminate_workers()
{
var worker1 = CreateTestProbe("worker1");
var supervisor = new SupervisorBuilder(this).Build();
supervisor.Tell(new RegisterWorker(worker1.Ref));
var watcher = CreateTestProbe();
watcher.Watch(worker1);
supervisor.Tell(new ScaleDownBy(1));
watcher.ExpectTerminated(worker1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment