Skip to content

Instantly share code, notes, and snippets.

@Danthar
Last active March 21, 2017 06:35
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Danthar/82e89eff01ba1eb1d4c3 to your computer and use it in GitHub Desktop.
Save Danthar/82e89eff01ba1eb1d4c3 to your computer and use it in GitHub Desktop.
public abstract class BackoffProtocol
{
/// <summary>
/// I made the TrackedMsg generic so you wont have to do manual matching in your child actor.
/// Downside is, that it limits your communication options with the child actor.
/// </summary>
/// <typeparam name="T"></typeparam>
[Serializable]
public sealed class TrackedMsg<T>
{
public Guid Id { get; private set; }
public T Msg { get; private set; }
public IActorRef Sender { get; private set; }
public TrackedMsg(T msg, IActorRef sender)
{
Id = Guid.NewGuid();
Msg = msg;
Sender = sender;
}
}
[Serializable]
public sealed class Sent
{
public Guid Id { get; private set; }
public Sent(Guid id)
{
Id = id;
}
}
}
/// <summary>
/// Actor used to supervise actors with ability to restart them after back-off timeout occurred.
/// </summary>
public class BackoffSupervisor<T> : ReceiveActor
{
#region Messages
/// <summary>
/// Request <see cref="BackoffSupervisor"/> with this message to receive <see cref="CurrentChild"/> response with current child.
/// </summary>
[Serializable]
public sealed class GetCurrentChild
{
public static readonly GetCurrentChild Instance = new GetCurrentChild();
private GetCurrentChild() { }
}
[Serializable]
public sealed class CurrentChild
{
public readonly IActorRef Ref;
public CurrentChild(IActorRef @ref)
{
Ref = @ref;
}
}
[Serializable]
private sealed class Tick
{
public static readonly Tick Instance = new Tick();
private Tick() { }
}
#endregion
private readonly Props _childProps;
private readonly string _childName;
private IActorRef _child = null;
private ExponentiaBackoff backoff;
private bool retryScheduled;
//buffer for messages which we are currently handling
private List<BackoffProtocol.TrackedMsg<T>> possiblyFailed;
public BackoffSupervisor(Props childProps, string childName, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor)
{
if (minBackoff <= TimeSpan.Zero) throw new ArgumentException("MinBackoff must be greater than 0");
if (maxBackoff < minBackoff) throw new ArgumentException("MaxBackoff must be greater than MinBackoff");
if (randomFactor < 0.0 || randomFactor > 1.0) throw new ArgumentException("RandomFactor must be between 0.0 and 1.0");
_childProps = childProps;
_childName = childName;
backoff = new ExponentiaBackoff(minBackoff, maxBackoff, randomFactor);
possiblyFailed = new List<BackoffProtocol.TrackedMsg<T>>();
Initialise();
}
protected override SupervisorStrategy SupervisorStrategy()
{
//we only work with the stoppingstrategy. Which is in line with the let-it-crash paradigm
//by overriding it here we eliminate the flexibility of providing your own.
//you can remove this ofcourse, so you can provide your own, for more flexibility
return Akka.Actor.SupervisorStrategy.StoppingStrategy;
}
protected override void PreStart()
{
StartChildActor();
base.PreStart();
}
protected void Initialise()
{
Receive<T>(message =>
{
var trackedMsg = new BackoffProtocol.TrackedMsg<T>(message, Sender);
possiblyFailed.Add(trackedMsg);
if (!backoff.IsStarted()) _child.Forward(trackedMsg);
if(backoff.IsStarted() && retryScheduled == false) ScheduleRetry();
});
Receive<Tick>(_ => {
//we could extend this code, to have the trackedMsg include a retry counter
//which we can monitor, and implement 'give-up' mechanics after X amount of retry's
foreach (var trackedMsg in possiblyFailed) {
_child.Tell(trackedMsg, trackedMsg.Sender);
}
retryScheduled = false;
});
Receive<Terminated>(terminated => {
if (_child != null && _child.Equals(terminated.ActorRef)) {
//restart and schedule a retry according to the backoff algorithm
_child = Context.Watch(Context.ActorOf(_childProps, _childName));
ScheduleRetry();
}
});
Receive<BackoffProtocol.Sent>(sent => {
//ack of monitored actor. Remove succesfull message and reset the backoff
possiblyFailed = possiblyFailed.Where(tracked => tracked.Id != sent.Id).ToList();
backoff.Reset();
});
Receive<GetCurrentChild>(_ => {
Sender.Tell(new CurrentChild(_child));
});
}
private void ScheduleRetry() {
Context.System.Scheduler.ScheduleTellOnce(backoff.NextDelay(), Self, Tick.Instance, Self);
retryScheduled = true;
}
private void StartChildActor()
{
if (_child == null) _child = Context.Watch(Context.ActorOf(_childProps, _childName));
}
}
internal class ExponentiaBackoff
{
private readonly TimeSpan _minBackoff;
private readonly TimeSpan _maxBackoff;
private readonly double _randomFactor;
private int _restartCount;
public ExponentiaBackoff(TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor)
{
_minBackoff = minBackoff;
_maxBackoff = maxBackoff;
_randomFactor = randomFactor;
}
public void Reset()
{
_restartCount = 0;
}
public bool IsStarted()
{
return _restartCount > 0;
}
public TimeSpan NextDelay()
{
var rand = 1.0 + ThreadLocalRandom.Current.NextDouble() * _randomFactor;
TimeSpan restartDelay;
if (_restartCount >= 30)
restartDelay = _maxBackoff; // duration overflow protection (> 100 years)
else
{
var max = Math.Min(_maxBackoff.Ticks, _minBackoff.Ticks * Math.Pow(2, _restartCount)) * rand;
if (max >= Double.MaxValue) restartDelay = _maxBackoff;
else restartDelay = new TimeSpan((long)max);
}
_restartCount++;
return restartDelay;
}
}
@Danthar
Copy link
Author

Danthar commented Nov 20, 2015

Your child actor would then look something like this:

  public class Child : ReceiveActor {
        public Child() {
            Receive<BackoffProtocol.TrackedMsg<string>>(message => {

                //do something that sometimes fails
                myExternalSystem.Execute(message.Msg);
                 //perhaps tell the sender something
                 //but at least communicate back to the supervisor that the message was handled.
                Context.Parent.Tell(new BackoffProtocol.Sent(message.Id));
            });
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment