Skip to content

Instantly share code, notes, and snippets.

@niik
Last active October 2, 2021 01:42
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save niik/6696449 to your computer and use it in GitHub Desktop.
Save niik/6696449 to your computer and use it in GitHub Desktop.
An Rx retry operator with a customizable back off strategy.
// Licensed under the MIT license with <3 by GitHub
/// <summary>
/// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
/// </summary>
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
/// <summary>
/// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
/// </summary>
/// <param name="source">The source observable.</param>
/// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
/// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
/// <param name="scheduler">The scheduler.</param>
/// <returns>
/// A cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates.
/// </returns>
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public static IObservable<T> RetryWithBackoffStrategy<T>(
this IObservable<T> source,
int retryCount = 3,
Func<int, TimeSpan> strategy = null,
Func<Exception, bool> retryOnError = null,
IScheduler scheduler = null)
{
strategy = strategy ?? ExpontentialBackoff;
scheduler = scheduler ?? RxApp.TaskpoolScheduler;
if (retryOnError == null)
retryOnError = e => true;
int attempt = 0;
return Observable.Defer(() =>
{
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
.Select(item => new Tuple<bool, T, Exception>(true, item, null))
.Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, T, Exception>>(e)
: Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3));
}
// Licensed under the MIT license with <3 by GitHub
static readonly Func<int, TimeSpan> LinearMsStrategy = n => TimeSpan.FromMilliseconds(1*n);
[Fact]
public void DoesNotRetryInCaseOfSuccess()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
tryCount++;
return Observable.Return("yolo");
});
source.RetryWithBackoffStrategy(
retryCount: 3,
strategy: LinearMsStrategy,
scheduler: sched
);
source.Subscribe();
Assert.Equal(1, tryCount);
sched.AdvanceByMs(1);
Assert.Equal(1, tryCount);
});
}
[Fact]
public void PropagatesLastObservedExceptionIfAllTriesFail()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
tryCount++;
return Observable.Throw<string>(new InvalidOperationException(tryCount.ToString()));
});
var observable = source.RetryWithBackoffStrategy(
retryCount: 3,
strategy: LinearMsStrategy,
scheduler: sched
);
Exception lastError = null;
observable.Subscribe(_ => {}, e => {lastError = e;});
Assert.Equal(1, tryCount);
sched.AdvanceByMs(1);
Assert.Equal(2, tryCount);
sched.AdvanceByMs(2);
Assert.Equal(3, tryCount);
Assert.Null(lastError);
Assert.Equal("3", lastError.Message);
});
}
[Fact]
public void RetriesOnceIfSuccessBeforeRetriesRunOut()
{
new TestScheduler().With(sched =>
{
int tryCount = 0;
var source = Observable.Defer(() =>
{
if (tryCount++ < 1) return Observable.Throw<string>(new InvalidOperationException());
return Observable.Return("yolo " + tryCount);
});
var observable = source.RetryWithBackoffStrategy(
retryCount: 5,
strategy: LinearMsStrategy,
scheduler: sched
);
string lastValue = null;
observable.Subscribe(n => {lastValue = n;});
Assert.Equal(1, tryCount);
Assert.Null(lastValue);
sched.AdvanceByMs(1);
Assert.Equal(2, tryCount);
Assert.Equal("yolo 2", lastValue);
});
}
[Fact]
public void UnsubscribingDisposesSource()
{
new TestScheduler().With(sched =>
{
int c = -1;
var neverEndingSource = Observable.Defer(() =>
{
return Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1), sched)
.Do(_ => c++)
.Select(_ => Unit.Default);
});
var observable = neverEndingSource.RetryWithBackoffStrategy(scheduler: sched);
// Cold
Assert.Equal(-1, c);
var disp = observable
.Take(2)
.Subscribe();
sched.AdvanceByMs(1);
Assert.Equal(0, c);
sched.AdvanceByMs(1);
Assert.Equal(1, c);
sched.AdvanceByMs(10);
Assert.Equal(1, c);
});
}
public IObservable<IWebResponse> MakeWebRequest() { ... }
// Retry 4 times with default back off strategy for all exceptions
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4)
// Retry 4 times with default back off strategy for all web exceptions
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException)
// Retry 4 times with default back off strategy on 202s
return Observable.Defer(() => MakeWebRequest())
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException && ((WebException)e).StatusCode == 202)
@TheAngryByrd
Copy link

Where does the .CanRetry() come from?

@niik
Copy link
Author

niik commented Feb 14, 2014

@theAngryBird sorry about that. That's an internal extension method that we use. I've updated it.

@atifaziz
Copy link

There is a typo in the reference to ExponentialBackoff on line #30 of RetryWithBackoffStrategy.

@atifaziz
Copy link

PropagatesLastObservedExceptionIfAllTriesFail is failing at line 65, reading: Assert.Null(lastError). That should read Assert.NotNull(lastError) since there's an assertion following that tests the error's message.

@atifaziz
Copy link

I've created a fork that fixes the typo and breaking test issue I mentioned. I also simplified the implementation of the operator (my main motivation for the fork) by using Notification instead of a triple (tuple of 3). It makes the code more readable and one can then simply rely on Dematerialize to either terminate with an error or push the result at the end.

@fidergo-stephane-gourichon

Thanks for sharing. One minor feedback: the ExponentialBackoff as implemented pow(n,2) is not technically "exponential", it's "square" (of the number of retries). Exponential would be pow(2, n) or, for that matter, pow(whatever, n).

@kentcb
Copy link

kentcb commented Jun 4, 2016

FYI, I got sick of copying this from mobile project to mobile project, so I formalized it here.

@ColinDabritz
Copy link

As a pointer back, this was created based on a question "Rx back off and retry" on StackOverflow here: https://stackoverflow.com/questions/20189166/rx-back-off-and-retry

@SuperJMN
Copy link

Hey, please correct the spelling: "ExpontentialBackoff". Also, you could replace the RxApp.TaskpoolScheduler by Scheduler.Default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment