Skip to content

Instantly share code, notes, and snippets.

@kentcb
Created November 12, 2015 00:54
Show Gist options
  • Save kentcb/771e002b18836244171e to your computer and use it in GitHub Desktop.
Save kentcb/771e002b18836244171e to your computer and use it in GitHub Desktop.
RetryWhile
public static class ObservableExtensions
{
public static IObservable<TSource> RetryWhile<TSource>(
this IObservable<TSource> @this,
Func<TSource, bool> predicate)
{
@this.AssertNotNull(nameof(@this));
predicate.AssertNotNull(nameof(predicate));
return Observable
.Create<TSource>(
observer =>
{
var disposable = new SerialDisposable();
disposable.Disposable = DoRetryWhileSubscription(disposable, @this, observer, predicate);
return disposable;
});
}
private static IDisposable DoRetryWhileSubscription<TSource>(
SerialDisposable subscription,
IObservable<TSource> source,
IObserver<TSource> observer,
Func<TSource, bool> predicate)
{
var completed = false;
return source
.Subscribe(
x =>
{
var retry = false;
try
{
retry = predicate(x);
}
catch (Exception ex)
{
observer.OnError(ex);
}
if (!retry)
{
observer.OnNext(x);
observer.OnCompleted();
completed = true;
}
else
{
subscription.Disposable = DoRetryWhileSubscription(subscription, source, observer, predicate);
}
},
ex =>
{
subscription.Disposable = DoRetryWhileSubscription(subscription, source, observer, predicate);
},
() =>
{
if (!completed)
{
subscription.Disposable = DoRetryWhileSubscription(subscription, source, observer, predicate);
}
});
}
}
public sealed class ObservableExtensionsFixture
{
[Fact]
public void retry_while_throws_if_source_is_null()
{
IObservable<Unit> source = null;
Assert.Throws<ArgumentNullException>(() => source.RetryWhile(x => true));
}
[Fact]
public void retry_while_throws_if_predicate_is_null()
{
Assert.Throws<ArgumentNullException>(() => Observable.Return(Unit.Default).RetryWhile(null));
}
[Fact]
public async Task retry_while_retries_if_source_errors()
{
var iteration = 0;
var source = Observable
.Return(Unit.Default)
.Select(_ => iteration)
.Do(
x =>
{
++iteration;
if (x < 10)
{
throw new InvalidOperationException();
}
});
var result = await source
.RetryWhile(x => false)
.FirstAsync()
.TimeoutIfTooSlow();
Assert.Equal(10, result);
}
[Fact]
public async Task retry_while_retries_if_predicate_returns_true()
{
var iteration = 0;
var source = Observable
.Return(Unit.Default)
.Select(_ => ++iteration);
var result = await source
.RetryWhile(x => x < 10)
.FirstAsync()
.TimeoutIfTooSlow();
Assert.Equal(10, result);
}
[Fact]
public async Task retry_while_retries_if_source_is_empty()
{
var iteration = 0;
var source = Observable
.Return(Unit.Default)
.Select(_ => ++iteration)
.Select(x => x < 10 ? Observable.Empty<int>() : Observable.Return(x))
.Switch();
var result = await source
.RetryWhile(x => x < 20)
.FirstAsync()
.TimeoutIfTooSlow();
Assert.Equal(20, result);
}
[Fact]
public async Task retry_while_ticks_errors_in_predicate()
{
var source = Observable
.Return(Unit.Default);
var result = source
.RetryWhile(_ => { throw new InvalidOperationException(); })
.FirstAsync()
.TimeoutIfTooSlow();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await result);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment