Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
This Akka.Net Actor implementation works fine in a non-clustered environment. If I enable clustering using a clustered round-robin-pool router on a single node I start getting dead letters after a short period of time which eventually stops executing jobs.
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Akka.Actor;
using Akka.DI.Core;
using Akka.Routing;
using Prototype.Common.Messages;
using Prototype.Messages;
using Prototype.Shared.Logger;
namespace Prototype.Actors
{
internal class JobCoordinatorActor : ReceiveActor, IWithUnboundedStash
{
private readonly ILogger _logger;
private readonly IActorRef _workerActor;
private JobState _jobState;
private string _jobIdentification;
public IStash Stash { get; set; }
public JobCoordinatorActor(ILogger logger)
{
_logger = logger;
_workerActor = Context.ActorOf(Context.DI().Props<WorkerActor>()
.WithRouter(new RoundRobinPool(10)), "worker");
Ready();
}
private void Running()
{
Receive<WorkerCompletedMessage>(message => HandleWorkerCompletedMessage(message));
Receive<StartJobMessage>(message => HandleStartJobMessageWhileRunning());
}
private void HandleStartJobMessageWhileRunning()
{
Stash.Stash();
}
private void Ready()
{
Receive<StartJobMessage>(message => HandleStartJobMessage(message));
}
private void HandleWorkerCompletedMessage(WorkerCompletedMessage message)
{
_jobState.JobDetailStates.First(x => x.PersonId == message.PersonId).Processed = true;
if (_jobState.JobDetailStates.All(x => x.Processed))
{
_jobState.Timer.Stop();
_logger.Log("Job {0} completed in {1} ms", message.JobIdentification, _jobState.Timer.ElapsedMilliseconds);
Context.ActorSelection("akka.tcp://MonitorActorSystem@127.0.0.1:8091/user/JobMonitor")
.Tell(new JobMonitorMessage(message.JobIdentification, "Completed", 0, _jobState.Timer.ElapsedMilliseconds));
_logger.Log("JobMonitor completed message sent!");
Become(Ready);
Stash.Unstash();
}
}
private void HandleStartJobMessage(StartJobMessage message)
{
_jobIdentification = message.JobIdentification;
var jobDetailStates = message.PersonIds.Select(personId => new JobDetailState(personId)).ToList();
_jobState = new JobState(Stopwatch.StartNew(), jobDetailStates);
Context.ActorSelection("akka.tcp://MonitorActorSystem@127.0.0.1:8091/user/JobMonitor")
.Tell(new JobMonitorMessage(message.JobIdentification, "Running", message.PersonIds.Count()));
_logger.Log("JobMonitor running message sent!");
foreach (var personId in message.PersonIds)
{
_workerActor.Tell(new StartWorkerMessage(_jobIdentification, message.Periode, personId));
}
Become(Running);
}
private class JobState
{
public JobState(Stopwatch timer, IList<JobDetailState> jobDetailStates)
{
Timer = timer;
JobDetailStates = jobDetailStates;
}
public IList<JobDetailState> JobDetailStates { get; set; }
public Stopwatch Timer { get; set; }
}
private class JobDetailState
{
public JobDetailState(int personId)
{
PersonId = personId;
}
public int PersonId { get; set; }
public bool Processed { get; set; }
}
}
}
namespace Prototype.Messages
{
public class StartJobMessage
{
public StartJobMessage(string jobIdentification, int periode, int[] personIds)
{
Periode = periode;
PersonIds = personIds;
JobIdentification = jobIdentification;
}
public string JobIdentification { get; set; }
public int Periode { get; private set; }
public int[] PersonIds { get; set; }
}
}
namespace Prototype.Messages
{
public class StartWorkerMessage
{
public StartWorkerMessage(string jobIdentification, int periode, int personId)
{
JobIdentification = jobIdentification;
Periode = periode;
PersonId = personId;
}
public string JobIdentification { get; set; }
public int Periode { get; private set; }
public int PersonId { get; private set; }
}
}
using System;
using Akka.Actor;
using Prototype.Common.Messages;
using Prototype.Messages;
using Prototype.WorkItems;
namespace Prototype.Actors
{
internal class WorkerActor : ReceiveActor
{
private readonly IWorkItemFactory _workItemFactory;
public WorkerActor(IWorkItemFactory workItemFactory)
{
_workItemFactory = workItemFactory;
Receive<StartWorkerMessage>(message => HandleCalculatePersonSteuerMessage(message));
}
private void HandleCalculatePersonSteuerMessage(StartWorkerMessage message)
{
var workItem = _workItemFactory.Create<StartWorkerMessage>("Steuerrechnung");
try
{
workItem.Process(message);
Context.ActorSelection("akka.tcp://MonitorActorSystem@127.0.0.1:8091/user/JobMonitor")
.Tell(new JobMonitorProgressMessage(message.JobIdentification, message.PersonId, "Completed", GetActorName()));
}
catch (Exception ex)
{
Context.ActorSelection("akka.tcp://MonitorActorSystem@127.0.0.1:8091/user/JobMonitor")
.Tell(new JobMonitorProgressMessage(message.JobIdentification, message.PersonId, "Error", GetActorName(), ex.Message));
}
Context.Sender.Tell(new WorkerCompletedMessage(message.JobIdentification, message.PersonId));
}
private string GetActorName()
{
return Self.Path.ToStringWithAddress().Substring(Self.Path.ToStringWithAddress().IndexOf("/user", StringComparison.Ordinal) + 5);
}
}
}
namespace Prototype.Messages
{
public class WorkerCompletedMessage
{
public WorkerCompletedMessage(string jobIdentification, int personId)
{
PersonId = personId;
JobIdentification = jobIdentification;
}
public string JobIdentification { get; set; }
public int PersonId { get; private set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.