Last active
March 21, 2017 06:35
-
-
Save Danthar/82e89eff01ba1eb1d4c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class BackoffProtocol | |
{ | |
/// <summary> | |
/// I made the TrackedMsg generic so you wont have to do manual matching in your child actor. | |
/// Downside is, that it limits your communication options with the child actor. | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
[Serializable] | |
public sealed class TrackedMsg<T> | |
{ | |
public Guid Id { get; private set; } | |
public T Msg { get; private set; } | |
public IActorRef Sender { get; private set; } | |
public TrackedMsg(T msg, IActorRef sender) | |
{ | |
Id = Guid.NewGuid(); | |
Msg = msg; | |
Sender = sender; | |
} | |
} | |
[Serializable] | |
public sealed class Sent | |
{ | |
public Guid Id { get; private set; } | |
public Sent(Guid id) | |
{ | |
Id = id; | |
} | |
} | |
} | |
/// <summary> | |
/// Actor used to supervise actors with ability to restart them after back-off timeout occurred. | |
/// </summary> | |
public class BackoffSupervisor<T> : ReceiveActor | |
{ | |
#region Messages | |
/// <summary> | |
/// Request <see cref="BackoffSupervisor"/> with this message to receive <see cref="CurrentChild"/> response with current child. | |
/// </summary> | |
[Serializable] | |
public sealed class GetCurrentChild | |
{ | |
public static readonly GetCurrentChild Instance = new GetCurrentChild(); | |
private GetCurrentChild() { } | |
} | |
[Serializable] | |
public sealed class CurrentChild | |
{ | |
public readonly IActorRef Ref; | |
public CurrentChild(IActorRef @ref) | |
{ | |
Ref = @ref; | |
} | |
} | |
[Serializable] | |
private sealed class Tick | |
{ | |
public static readonly Tick Instance = new Tick(); | |
private Tick() { } | |
} | |
#endregion | |
private readonly Props _childProps; | |
private readonly string _childName; | |
private IActorRef _child = null; | |
private ExponentiaBackoff backoff; | |
private bool retryScheduled; | |
//buffer for messages which we are currently handling | |
private List<BackoffProtocol.TrackedMsg<T>> possiblyFailed; | |
public BackoffSupervisor(Props childProps, string childName, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) | |
{ | |
if (minBackoff <= TimeSpan.Zero) throw new ArgumentException("MinBackoff must be greater than 0"); | |
if (maxBackoff < minBackoff) throw new ArgumentException("MaxBackoff must be greater than MinBackoff"); | |
if (randomFactor < 0.0 || randomFactor > 1.0) throw new ArgumentException("RandomFactor must be between 0.0 and 1.0"); | |
_childProps = childProps; | |
_childName = childName; | |
backoff = new ExponentiaBackoff(minBackoff, maxBackoff, randomFactor); | |
possiblyFailed = new List<BackoffProtocol.TrackedMsg<T>>(); | |
Initialise(); | |
} | |
protected override SupervisorStrategy SupervisorStrategy() | |
{ | |
//we only work with the stoppingstrategy. Which is in line with the let-it-crash paradigm | |
//by overriding it here we eliminate the flexibility of providing your own. | |
//you can remove this ofcourse, so you can provide your own, for more flexibility | |
return Akka.Actor.SupervisorStrategy.StoppingStrategy; | |
} | |
protected override void PreStart() | |
{ | |
StartChildActor(); | |
base.PreStart(); | |
} | |
protected void Initialise() | |
{ | |
Receive<T>(message => | |
{ | |
var trackedMsg = new BackoffProtocol.TrackedMsg<T>(message, Sender); | |
possiblyFailed.Add(trackedMsg); | |
if (!backoff.IsStarted()) _child.Forward(trackedMsg); | |
if(backoff.IsStarted() && retryScheduled == false) ScheduleRetry(); | |
}); | |
Receive<Tick>(_ => { | |
//we could extend this code, to have the trackedMsg include a retry counter | |
//which we can monitor, and implement 'give-up' mechanics after X amount of retry's | |
foreach (var trackedMsg in possiblyFailed) { | |
_child.Tell(trackedMsg, trackedMsg.Sender); | |
} | |
retryScheduled = false; | |
}); | |
Receive<Terminated>(terminated => { | |
if (_child != null && _child.Equals(terminated.ActorRef)) { | |
//restart and schedule a retry according to the backoff algorithm | |
_child = Context.Watch(Context.ActorOf(_childProps, _childName)); | |
ScheduleRetry(); | |
} | |
}); | |
Receive<BackoffProtocol.Sent>(sent => { | |
//ack of monitored actor. Remove succesfull message and reset the backoff | |
possiblyFailed = possiblyFailed.Where(tracked => tracked.Id != sent.Id).ToList(); | |
backoff.Reset(); | |
}); | |
Receive<GetCurrentChild>(_ => { | |
Sender.Tell(new CurrentChild(_child)); | |
}); | |
} | |
private void ScheduleRetry() { | |
Context.System.Scheduler.ScheduleTellOnce(backoff.NextDelay(), Self, Tick.Instance, Self); | |
retryScheduled = true; | |
} | |
private void StartChildActor() | |
{ | |
if (_child == null) _child = Context.Watch(Context.ActorOf(_childProps, _childName)); | |
} | |
} | |
internal class ExponentiaBackoff | |
{ | |
private readonly TimeSpan _minBackoff; | |
private readonly TimeSpan _maxBackoff; | |
private readonly double _randomFactor; | |
private int _restartCount; | |
public ExponentiaBackoff(TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) | |
{ | |
_minBackoff = minBackoff; | |
_maxBackoff = maxBackoff; | |
_randomFactor = randomFactor; | |
} | |
public void Reset() | |
{ | |
_restartCount = 0; | |
} | |
public bool IsStarted() | |
{ | |
return _restartCount > 0; | |
} | |
public TimeSpan NextDelay() | |
{ | |
var rand = 1.0 + ThreadLocalRandom.Current.NextDouble() * _randomFactor; | |
TimeSpan restartDelay; | |
if (_restartCount >= 30) | |
restartDelay = _maxBackoff; // duration overflow protection (> 100 years) | |
else | |
{ | |
var max = Math.Min(_maxBackoff.Ticks, _minBackoff.Ticks * Math.Pow(2, _restartCount)) * rand; | |
if (max >= Double.MaxValue) restartDelay = _maxBackoff; | |
else restartDelay = new TimeSpan((long)max); | |
} | |
_restartCount++; | |
return restartDelay; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Your child actor would then look something like this: