Skip to content

Instantly share code, notes, and snippets.

@MichaelEvans
Forked from niik/RetryWithBackOffStrategy.cs
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MichaelEvans/6d58c3833feccc627d9d to your computer and use it in GitHub Desktop.
Save MichaelEvans/6d58c3833feccc627d9d to your computer and use it in GitHub Desktop.
// Licensed under the MIT license with <3 by GitHub
/// <summary>
/// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
/// </summary>
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
/// <summary>
/// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
/// </summary>
/// <param name="source">The source observable.</param>
/// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
/// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
/// <param name="scheduler">The scheduler.</param>
/// <returns>
/// A cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates.
/// </returns>
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public static IObservable<T> RetryWithBackoffStrategy<T>(
this IObservable<T> source,
int retryCount = 3,
Func<int, TimeSpan> strategy = null,
Func<Exception, bool> retryOnError = null,
IScheduler scheduler = null)
{
strategy = strategy ?? ExpontentialBackoff;
scheduler = scheduler ?? RxApp.TaskpoolScheduler;
if (retryOnError == null)
retryOnError = e => true;
int attempt = 0;
return Observable.Defer(() =>
{
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
.Select(item => new Tuple<bool, T, Exception>(true, item, null))
.Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, T, Exception>>(e)
: Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3));
}
// Licensed under the MIT license with <3 by GitHub
static readonly Func<int, TimeSpan> LinearMsStrategy = n => TimeSpan.FromMilliseconds(1*n);
[Fact]
public void DoesNotRetryInCaseOfSuccess()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
tryCount++;
return Observable.Return("yolo");
});
source.RetryWithBackoffStrategy(
retryCount: 3,
strategy: LinearMsStrategy,
scheduler: sched
);
source.Subscribe();
Assert.Equal(1, tryCount);
sched.AdvanceByMs(1);
Assert.Equal(1, tryCount);
});
}
[Fact]
public void PropagatesLastObservedExceptionIfAllTriesFail()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
tryCount++;
return Observable.Throw<string>(new InvalidOperationException(tryCount.ToString()));
});
var observable = source.RetryWithBackoffStrategy(
retryCount: 3,
strategy: LinearMsStrategy,
scheduler: sched
);
Exception lastError = null;
observable.Subscribe(_ => {}, e => {lastError = e;});
Assert.Equal(1, tryCount);
sched.AdvanceByMs(1);
Assert.Equal(2, tryCount);
sched.AdvanceByMs(2);
Assert.Equal(3, tryCount);
Assert.Null(lastError);
Assert.Equal("3", lastError.Message);
});
}
[Fact]
public void RetriesOnceIfSuccessBeforeRetriesRunOut()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
if (tryCount++ < 1) return Observable.Throw<string>(new InvalidOperationException());
return Observable.Return("yolo " + tryCount);
});
var observable = source.RetryWithBackoffStrategy(
retryCount: 5,
strategy: LinearMsStrategy,
scheduler: sched
);
string lastValue = null;
observable.Subscribe(n => {lastValue = n;});
Assert.Equal(1, tryCount);
Assert.Null(lastValue);
sched.AdvanceByMs(1);
Assert.Equal(2, tryCount);
Assert.Equal("yolo 2", lastValue);
});
}
[Fact]
public void UnsubscribingDisposesSource()
{
new TestScheduler().With(sched =>
{
int c = -1;
var neverEndingSource = Observable.Defer(() =>
{
return Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1), sched)
.Do(_ => c++)
.Select(_ => Unit.Default);
});
var observable = neverEndingSource.RetryWithBackoffStrategy(scheduler: sched);
// Cold
Assert.Equal(-1, c);
var disp = observable
.Take(2)
.Subscribe();
sched.AdvanceByMs(1);
Assert.Equal(0, c);
sched.AdvanceByMs(1);
Assert.Equal(1, c);
sched.AdvanceByMs(10);
Assert.Equal(1, c);
});
}
public IObservable<IWebResponse> MakeWebRequest() { ... }
// Retry 4 times with default back off strategy for all exceptions
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4)
// Retry 4 times with default back off strategy for all web exceptions
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException)
// Retry 4 times with default back off strategy on 202s
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException && ((WebException)e).StatusCode == 202)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment