Created
November 12, 2015 00:54
-
-
Save kentcb/771e002b18836244171e to your computer and use it in GitHub Desktop.
RetryWhile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ObservableExtensions | |
{ | |
public static IObservable<TSource> RetryWhile<TSource>( | |
this IObservable<TSource> @this, | |
Func<TSource, bool> predicate) | |
{ | |
@this.AssertNotNull(nameof(@this)); | |
predicate.AssertNotNull(nameof(predicate)); | |
return Observable | |
.Create<TSource>( | |
observer => | |
{ | |
var disposable = new SerialDisposable(); | |
disposable.Disposable = DoRetryWhileSubscription(disposable, @this, observer, predicate); | |
return disposable; | |
}); | |
} | |
private static IDisposable DoRetryWhileSubscription<TSource>( | |
SerialDisposable subscription, | |
IObservable<TSource> source, | |
IObserver<TSource> observer, | |
Func<TSource, bool> predicate) | |
{ | |
var completed = false; | |
return source | |
.Subscribe( | |
x => | |
{ | |
var retry = false; | |
try | |
{ | |
retry = predicate(x); | |
} | |
catch (Exception ex) | |
{ | |
observer.OnError(ex); | |
} | |
if (!retry) | |
{ | |
observer.OnNext(x); | |
observer.OnCompleted(); | |
completed = true; | |
} | |
else | |
{ | |
subscription.Disposable = DoRetryWhileSubscription(subscription, source, observer, predicate); | |
} | |
}, | |
ex => | |
{ | |
subscription.Disposable = DoRetryWhileSubscription(subscription, source, observer, predicate); | |
}, | |
() => | |
{ | |
if (!completed) | |
{ | |
subscription.Disposable = DoRetryWhileSubscription(subscription, source, observer, predicate); | |
} | |
}); | |
} | |
} | |
public sealed class ObservableExtensionsFixture | |
{ | |
[Fact] | |
public void retry_while_throws_if_source_is_null() | |
{ | |
IObservable<Unit> source = null; | |
Assert.Throws<ArgumentNullException>(() => source.RetryWhile(x => true)); | |
} | |
[Fact] | |
public void retry_while_throws_if_predicate_is_null() | |
{ | |
Assert.Throws<ArgumentNullException>(() => Observable.Return(Unit.Default).RetryWhile(null)); | |
} | |
[Fact] | |
public async Task retry_while_retries_if_source_errors() | |
{ | |
var iteration = 0; | |
var source = Observable | |
.Return(Unit.Default) | |
.Select(_ => iteration) | |
.Do( | |
x => | |
{ | |
++iteration; | |
if (x < 10) | |
{ | |
throw new InvalidOperationException(); | |
} | |
}); | |
var result = await source | |
.RetryWhile(x => false) | |
.FirstAsync() | |
.TimeoutIfTooSlow(); | |
Assert.Equal(10, result); | |
} | |
[Fact] | |
public async Task retry_while_retries_if_predicate_returns_true() | |
{ | |
var iteration = 0; | |
var source = Observable | |
.Return(Unit.Default) | |
.Select(_ => ++iteration); | |
var result = await source | |
.RetryWhile(x => x < 10) | |
.FirstAsync() | |
.TimeoutIfTooSlow(); | |
Assert.Equal(10, result); | |
} | |
[Fact] | |
public async Task retry_while_retries_if_source_is_empty() | |
{ | |
var iteration = 0; | |
var source = Observable | |
.Return(Unit.Default) | |
.Select(_ => ++iteration) | |
.Select(x => x < 10 ? Observable.Empty<int>() : Observable.Return(x)) | |
.Switch(); | |
var result = await source | |
.RetryWhile(x => x < 20) | |
.FirstAsync() | |
.TimeoutIfTooSlow(); | |
Assert.Equal(20, result); | |
} | |
[Fact] | |
public async Task retry_while_ticks_errors_in_predicate() | |
{ | |
var source = Observable | |
.Return(Unit.Default); | |
var result = source | |
.RetryWhile(_ => { throw new InvalidOperationException(); }) | |
.FirstAsync() | |
.TimeoutIfTooSlow(); | |
await Assert.ThrowsAsync<InvalidOperationException>(async () => await result); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment