Skip to content

Instantly share code, notes, and snippets.

@johannesegger
Last active January 23, 2017 05:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johannesegger/3e84a17728813efd8b56cad7c044dbb9 to your computer and use it in GitHub Desktop.
Save johannesegger/3e84a17728813efd8b56cad7c044dbb9 to your computer and use it in GitHub Desktop.
public class BackOffAndRetryTests
{
[Fact]
public void BackOffAndRetryShouldRetry()
{
var result = new List<string>();
Observable
.Create<string>(obs =>
{
obs.OnNext("1");
obs.OnNext("2");
obs.OnError(new Exception("3"));
return Disposable.Empty;
})
.BackOffAndRetry(_ => TimeSpan.Zero, (i, e) => i < 3)
.Subscribe(result.Add, e => result.Add("error " + e.Message), () => result.Add("completed"));
result.ShouldBeEquivalentTo(new[] { "1", "2", "1", "2", "1", "2", "1", "2", "error 3" });
}
[Fact]
public async Task BackOffAndRetryShouldBackOff()
{
var result = new List<DateTime>();
var tcs = new TaskCompletionSource<Unit>();
var start = DateTime.Now;
Observable
.Create<string>(obs =>
{
obs.OnNext("1");
obs.OnNext("2");
obs.OnError(new Exception("3"));
return Disposable.Empty;
})
.BackOffAndRetry(i => TimeSpan.FromSeconds(i), (i, e) => i < 3)
.Select(i => DateTime.Now)
.Subscribe(
result.Add,
e => { result.Add(DateTime.Now); tcs.SetResult(Unit.Default); },
() => { result.Add(DateTime.Now); tcs.SetResult(Unit.Default); }
);
await tcs.Task;
(result[1] - result[0]).Should().BeCloseTo(TimeSpan.Zero, 20);
(result[2] - result[1]).Should().BeCloseTo(TimeSpan.FromSeconds(1), 20);
(result[3] - result[2]).Should().BeCloseTo(TimeSpan.Zero, 20);
(result[4] - result[3]).Should().BeCloseTo(TimeSpan.FromSeconds(2), 20);
(result[5] - result[4]).Should().BeCloseTo(TimeSpan.Zero, 20);
(result[6] - result[5]).Should().BeCloseTo(TimeSpan.FromSeconds(3), 20);
(result[7] - result[6]).Should().BeCloseTo(TimeSpan.Zero, 20);
(result[8] - result[7]).Should().BeCloseTo(TimeSpan.Zero, 20);
}
[Fact]
public void BackOffAndRetryShouldBeAbleToCompleteWithoutError()
{
var result = new List<string>();
var fail = true;
Observable
.Create<string>(obs =>
{
obs.OnNext("1");
obs.OnNext("2");
if (fail)
{
fail = false;
obs.OnError(new Exception("3"));
}
else
{
obs.OnNext("4");
obs.OnCompleted();
}
return Disposable.Empty;
})
.BackOffAndRetry(i => TimeSpan.Zero, (i, e) => i < 1)
.Subscribe(result.Add, e => result.Add("error " + e.Message), () => result.Add("completed"));
result.ShouldBeEquivalentTo(new[] { "1", "2", "1", "2", "4", "completed" });
}
}
public static class ObservableExtensions
{
private static IObservable<T> BackOffAndRetry<T>(
this IObservable<T> source,
Func<int, TimeSpan> strategy,
Func<int, Exception, bool> retryOnError,
int attempt)
{
return Observable
.Defer(() =>
{
var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);
return s
.Catch<T, Exception>(e =>
{
if (retryOnError(attempt, e))
{
return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
}
return Observable.Throw<T>(e);
});
});
}
public static IObservable<T> BackOffAndRetry<T>(
this IObservable<T> source,
Func<int, TimeSpan> strategy,
Func<int, Exception, bool> retryOnError)
{
return source.BackOffAndRetry(strategy, retryOnError, 0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment