Skip to content

Instantly share code, notes, and snippets.

@atifaziz
Forked from niik/RetryWithBackOffStrategy.cs
Last active February 15, 2021 15:02
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save atifaziz/c6776b936a36a98a8153 to your computer and use it in GitHub Desktop.
Save atifaziz/c6776b936a36a98a8153 to your computer and use it in GitHub Desktop.
// Licensed under the MIT license with <3 by GitHub
/// <summary>
/// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
/// </summary>
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
/// <summary>
/// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
/// </summary>
/// <param name="source">The source observable.</param>
/// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
/// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
/// <param name="scheduler">The scheduler.</param>
/// <returns>
/// A cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates.
/// </returns>
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public static IObservable<T> RetryWithBackoffStrategy<T>(
this IObservable<T> source,
int retryCount = 3,
Func<int, TimeSpan> strategy = null,
Func<Exception, bool> retryOnError = null,
IScheduler scheduler = null)
{
strategy = strategy ?? ExponentialBackoff;
scheduler = scheduler ?? RxApp.TaskpoolScheduler;
if (retryOnError == null)
retryOnError = e => true;
int attempt = 0;
return Observable.Defer(() =>
{
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
.Select(Notification.CreateOnNext)
.Catch((Exception e) => retryOnError(e)
? Observable.Throw<Notification<T>>(e)
: Observable.Return(Notification.CreateOnError<T>(e)));
})
.Retry(retryCount)
.Dematerialize();
}
// Licensed under the MIT license with <3 by GitHub
static readonly Func<int, TimeSpan> LinearMsStrategy = n => TimeSpan.FromMilliseconds(1 * n);
[Fact]
public void DoesNotRetryInCaseOfSuccess()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
tryCount++;
return Observable.Return("yolo");
});
source.RetryWithBackoffStrategy(
retryCount: 3,
strategy: LinearMsStrategy,
scheduler: sched
);
source.Subscribe();
Assert.Equal(1, tryCount);
sched.AdvanceByMs(1);
Assert.Equal(1, tryCount);
});
}
[Fact]
public void PropagatesLastObservedExceptionIfAllTriesFail()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
tryCount++;
return Observable.Throw<string>(new InvalidOperationException(tryCount.ToString()));
});
var observable = source.RetryWithBackoffStrategy(
retryCount: 3,
strategy: LinearMsStrategy,
scheduler: sched
);
Exception lastError = null;
observable.Subscribe(_ => { }, e => { lastError = e; });
Assert.Equal(1, tryCount);
sched.AdvanceByMs(1);
Assert.Equal(2, tryCount);
sched.AdvanceByMs(2);
Assert.Equal(3, tryCount);
Assert.NotNull(lastError);
Assert.Equal("3", lastError.Message);
});
}
[Fact]
public void RetriesOnceIfSuccessBeforeRetriesRunOut()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
if (tryCount++ < 1) return Observable.Throw<string>(new InvalidOperationException());
return Observable.Return("yolo " + tryCount);
});
var observable = source.RetryWithBackoffStrategy(
retryCount: 5,
strategy: LinearMsStrategy,
scheduler: sched
);
string lastValue = null;
observable.Subscribe(n => { lastValue = n; });
Assert.Equal(1, tryCount);
Assert.Null(lastValue);
sched.AdvanceByMs(1);
Assert.Equal(2, tryCount);
Assert.Equal("yolo 2", lastValue);
});
}
[Fact]
public void UnsubscribingDisposesSource()
{
new TestScheduler().With(sched =>
{
int c = -1;
var neverEndingSource = Observable.Defer(() =>
{
return Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1), sched)
.Do(_ => c++)
.Select(_ => Unit.Default);
});
var observable = neverEndingSource.RetryWithBackoffStrategy(scheduler: sched);
// Cold
Assert.Equal(-1, c);
var disp = observable
.Take(2)
.Subscribe();
sched.AdvanceByMs(1);
Assert.Equal(0, c);
sched.AdvanceByMs(1);
Assert.Equal(1, c);
sched.AdvanceByMs(10);
Assert.Equal(1, c);
});
}
public IObservable<IWebResponse> MakeWebRequest() { ... }
// Retry 4 times with default back off strategy for all exceptions
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4)
// Retry 4 times with default back off strategy for all web exceptions
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException)
// Retry 4 times with default back off strategy on 202s
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException && ((WebException)e).StatusCode == 202)
@fidergo-stephane-gourichon

Thanks for this fork, it is indeed shorter (yet the use of Dematerialize makes one more thing to figure out). See also my minor comment on upstream gist about exponential backoff.

@SuperJMN
Copy link

Hey, thanks for the fork! What's the deal with Materialize, how does this work? It looks really streamlined, but I cannot figure out what's going on behind the scene.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment