Last active
August 29, 2015 14:07
-
-
Save makomweb/f881905e7f32369e1a20 to your computer and use it in GitHub Desktop.
Rx operator which evaluates cancellation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RxPlayground | |
{ | |
public static class MyReactiveExtensions | |
{ | |
public static IObservable<T> ToCancellable<T>(this IObservable<T> source, CancellationToken cancellationToken) | |
{ | |
var obj = new ThrowWhenCancelled<T>(source); | |
cancellationToken.Register(obj.Cancel); | |
return obj.ToObservable(); | |
} | |
private class ThrowWhenCancelled<T> | |
{ | |
private readonly Subject<T> _cancellation = new Subject<T>(); | |
private readonly IObservable<T> _source; | |
public ThrowWhenCancelled(IObservable<T> source) | |
{ | |
_source = source; | |
} | |
public IObservable<T> ToObservable() | |
{ | |
return _source.Merge(_cancellation); | |
} | |
public void Cancel() | |
{ | |
_cancellation.OnError(new TaskCanceledException()); | |
} | |
} | |
} | |
public class Worker | |
{ | |
private readonly CancellationTokenSource _cts = new CancellationTokenSource(); | |
public bool Succeeded { get; private set; } | |
public IObservable<bool> Do() | |
{ | |
return Observable.Timer(TimeSpan.FromSeconds(1)) | |
.Do(_ => Succeeded = true) | |
.Select(_ => true); | |
} | |
public IObservable<bool> DoUntil() | |
{ | |
return Observable.Timer(TimeSpan.FromSeconds(1)) | |
.TakeUntil(IsCancelled) | |
.Do(_ => Succeeded = true) | |
.Select(_ => true); | |
} | |
public IObservable<bool> DoCancelled() | |
{ | |
return Observable.Timer(TimeSpan.FromSeconds(1)) | |
.ToCancellable(_cts.Token) | |
.Do(_ => Succeeded = true) | |
.Select(_ => true); | |
} | |
public Task<bool> DoAsync() | |
{ | |
return Do().ToTask(_cts.Token); | |
} | |
public void Cancel() | |
{ | |
_cts.Cancel(); | |
} | |
private IObservable<bool> IsCancelled | |
{ | |
get { return Observable.Return(_cts.IsCancellationRequested); } | |
} | |
} | |
public class CancellationTest | |
{ | |
[Fact] | |
public async Task When_doing_async_then_should_succeed() | |
{ | |
var worker = new Worker(); | |
Observable.Timer(TimeSpan.FromMilliseconds(500)) | |
.Subscribe(_ => worker.Cancel()); | |
var result = await worker.Do().FirstAsync(); | |
result.Should().BeTrue(); | |
worker.Succeeded.Should().BeTrue(); | |
} | |
[Fact] | |
public async Task When_doing_until_then_should_throw() | |
{ | |
var worker = new Worker(); | |
Observable.Timer(TimeSpan.FromMilliseconds(500)) | |
.Subscribe(_ => worker.Cancel()); | |
var ex = await ThrowsAsync<InvalidOperationException>(() => worker.DoUntil().ToTask()); | |
ex.Should().NotBeNull(); | |
} | |
[Fact] | |
public async Task When_cancelled_then_should_throw() | |
{ | |
var worker = new Worker(); | |
Observable.Timer(TimeSpan.FromMilliseconds(500)) | |
.Subscribe(_ => worker.Cancel()); | |
var ex = await ThrowsAsync<TaskCanceledException>(() => worker.DoCancelled().ToTask()); | |
ex.Should().NotBeNull(); | |
} | |
internal static async Task<T> ThrowsAsync<T>(Func<Task> func) where T : class | |
{ | |
Exception actual = null; | |
try | |
{ | |
await func(); | |
} | |
catch (Exception e) | |
{ | |
actual = e; | |
} | |
actual.Should().NotBeNull(); | |
actual.Should().BeAssignableTo<T>(); | |
return actual as T; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment