Last active
October 2, 2021 01:42
-
-
Save niik/6696449 to your computer and use it in GitHub Desktop.
An Rx retry operator with a customizable back off strategy.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed under the MIT license with <3 by GitHub | |
/// <summary> | |
/// An exponential back off strategy which starts with 1 second and then 4, 9, 16... | |
/// </summary> | |
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")] | |
public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); | |
/// <summary> | |
/// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the | |
/// specified number of times or until it successfully terminates. Allows for customizable back off strategy. | |
/// </summary> | |
/// <param name="source">The source observable.</param> | |
/// <param name="retryCount">The number of attempts of running the source observable before failing.</param> | |
/// <param name="strategy">The strategy to use in backing off, exponential by default.</param> | |
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param> | |
/// <param name="scheduler">The scheduler.</param> | |
/// <returns> | |
/// A cold observable which retries (re-subscribes to) the source observable on error up to the | |
/// specified number of times or until it successfully terminates. | |
/// </returns> | |
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] | |
public static IObservable<T> RetryWithBackoffStrategy<T>( | |
this IObservable<T> source, | |
int retryCount = 3, | |
Func<int, TimeSpan> strategy = null, | |
Func<Exception, bool> retryOnError = null, | |
IScheduler scheduler = null) | |
{ | |
strategy = strategy ?? ExpontentialBackoff; | |
scheduler = scheduler ?? RxApp.TaskpoolScheduler; | |
if (retryOnError == null) | |
retryOnError = e => true; | |
int attempt = 0; | |
return Observable.Defer(() => | |
{ | |
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler)) | |
.Select(item => new Tuple<bool, T, Exception>(true, item, null)) | |
.Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e) | |
? Observable.Throw<Tuple<bool, T, Exception>>(e) | |
: Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e))); | |
}) | |
.Retry(retryCount) | |
.SelectMany(t => t.Item1 | |
? Observable.Return(t.Item2) | |
: Observable.Throw<T>(t.Item3)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed under the MIT license with <3 by GitHub | |
static readonly Func<int, TimeSpan> LinearMsStrategy = n => TimeSpan.FromMilliseconds(1*n); | |
[Fact] | |
public void DoesNotRetryInCaseOfSuccess() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int tryCount = 0; | |
var source = Observable.Defer(() => | |
{ | |
tryCount++; | |
return Observable.Return("yolo"); | |
}); | |
source.RetryWithBackoffStrategy( | |
retryCount: 3, | |
strategy: LinearMsStrategy, | |
scheduler: sched | |
); | |
source.Subscribe(); | |
Assert.Equal(1, tryCount); | |
sched.AdvanceByMs(1); | |
Assert.Equal(1, tryCount); | |
}); | |
} | |
[Fact] | |
public void PropagatesLastObservedExceptionIfAllTriesFail() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int tryCount = 0; | |
var source = Observable.Defer(() => | |
{ | |
tryCount++; | |
return Observable.Throw<string>(new InvalidOperationException(tryCount.ToString())); | |
}); | |
var observable = source.RetryWithBackoffStrategy( | |
retryCount: 3, | |
strategy: LinearMsStrategy, | |
scheduler: sched | |
); | |
Exception lastError = null; | |
observable.Subscribe(_ => {}, e => {lastError = e;}); | |
Assert.Equal(1, tryCount); | |
sched.AdvanceByMs(1); | |
Assert.Equal(2, tryCount); | |
sched.AdvanceByMs(2); | |
Assert.Equal(3, tryCount); | |
Assert.Null(lastError); | |
Assert.Equal("3", lastError.Message); | |
}); | |
} | |
[Fact] | |
public void RetriesOnceIfSuccessBeforeRetriesRunOut() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int tryCount = 0; | |
var source = Observable.Defer(() => | |
{ | |
if (tryCount++ < 1) return Observable.Throw<string>(new InvalidOperationException()); | |
return Observable.Return("yolo " + tryCount); | |
}); | |
var observable = source.RetryWithBackoffStrategy( | |
retryCount: 5, | |
strategy: LinearMsStrategy, | |
scheduler: sched | |
); | |
string lastValue = null; | |
observable.Subscribe(n => {lastValue = n;}); | |
Assert.Equal(1, tryCount); | |
Assert.Null(lastValue); | |
sched.AdvanceByMs(1); | |
Assert.Equal(2, tryCount); | |
Assert.Equal("yolo 2", lastValue); | |
}); | |
} | |
[Fact] | |
public void UnsubscribingDisposesSource() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int c = -1; | |
var neverEndingSource = Observable.Defer(() => | |
{ | |
return Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1), sched) | |
.Do(_ => c++) | |
.Select(_ => Unit.Default); | |
}); | |
var observable = neverEndingSource.RetryWithBackoffStrategy(scheduler: sched); | |
// Cold | |
Assert.Equal(-1, c); | |
var disp = observable | |
.Take(2) | |
.Subscribe(); | |
sched.AdvanceByMs(1); | |
Assert.Equal(0, c); | |
sched.AdvanceByMs(1); | |
Assert.Equal(1, c); | |
sched.AdvanceByMs(10); | |
Assert.Equal(1, c); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public IObservable<IWebResponse> MakeWebRequest() { ... } | |
// Retry 4 times with default back off strategy for all exceptions | |
return Observable.Defer(() => MakeWebRequest()) | |
.RetryWithBackoffStrategy(retryCount: 4) | |
// Retry 4 times with default back off strategy for all web exceptions | |
return Observable.Defer(() => MakeWebRequest()) | |
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException) | |
// Retry 4 times with default back off strategy on 202s | |
return Observable.Defer(() => MakeWebRequest()) | |
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException && ((WebException)e).StatusCode == 202) |
Thanks for sharing. One minor feedback: the ExponentialBackoff as implemented pow(n,2)
is not technically "exponential", it's "square" (of the number of retries). Exponential would be pow(2, n)
or, for that matter, pow(whatever, n)
.
FYI, I got sick of copying this from mobile project to mobile project, so I formalized it here.
As a pointer back, this was created based on a question "Rx back off and retry" on StackOverflow here: https://stackoverflow.com/questions/20189166/rx-back-off-and-retry
Hey, please correct the spelling: "ExpontentialBackoff". Also, you could replace the RxApp.TaskpoolScheduler
by Scheduler.Default
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've created a fork that fixes the typo and breaking test issue I mentioned. I also simplified the implementation of the operator (my main motivation for the fork) by using
Notification
instead of a triple (tuple of 3). It makes the code more readable and one can then simply rely onDematerialize
to either terminate with an error or push the result at the end.