Skip to content

Instantly share code, notes, and snippets.

@makomweb
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save makomweb/f881905e7f32369e1a20 to your computer and use it in GitHub Desktop.
Save makomweb/f881905e7f32369e1a20 to your computer and use it in GitHub Desktop.
Rx operator which evaluates cancellation
public class RxPlayground
{
public static class MyReactiveExtensions
{
public static IObservable<T> ToCancellable<T>(this IObservable<T> source, CancellationToken cancellationToken)
{
var obj = new ThrowWhenCancelled<T>(source);
cancellationToken.Register(obj.Cancel);
return obj.ToObservable();
}
private class ThrowWhenCancelled<T>
{
private readonly Subject<T> _cancellation = new Subject<T>();
private readonly IObservable<T> _source;
public ThrowWhenCancelled(IObservable<T> source)
{
_source = source;
}
public IObservable<T> ToObservable()
{
return _source.Merge(_cancellation);
}
public void Cancel()
{
_cancellation.OnError(new TaskCanceledException());
}
}
}
public class Worker
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public bool Succeeded { get; private set; }
public IObservable<bool> Do()
{
return Observable.Timer(TimeSpan.FromSeconds(1))
.Do(_ => Succeeded = true)
.Select(_ => true);
}
public IObservable<bool> DoUntil()
{
return Observable.Timer(TimeSpan.FromSeconds(1))
.TakeUntil(IsCancelled)
.Do(_ => Succeeded = true)
.Select(_ => true);
}
public IObservable<bool> DoCancelled()
{
return Observable.Timer(TimeSpan.FromSeconds(1))
.ToCancellable(_cts.Token)
.Do(_ => Succeeded = true)
.Select(_ => true);
}
public Task<bool> DoAsync()
{
return Do().ToTask(_cts.Token);
}
public void Cancel()
{
_cts.Cancel();
}
private IObservable<bool> IsCancelled
{
get { return Observable.Return(_cts.IsCancellationRequested); }
}
}
public class CancellationTest
{
[Fact]
public async Task When_doing_async_then_should_succeed()
{
var worker = new Worker();
Observable.Timer(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => worker.Cancel());
var result = await worker.Do().FirstAsync();
result.Should().BeTrue();
worker.Succeeded.Should().BeTrue();
}
[Fact]
public async Task When_doing_until_then_should_throw()
{
var worker = new Worker();
Observable.Timer(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => worker.Cancel());
var ex = await ThrowsAsync<InvalidOperationException>(() => worker.DoUntil().ToTask());
ex.Should().NotBeNull();
}
[Fact]
public async Task When_cancelled_then_should_throw()
{
var worker = new Worker();
Observable.Timer(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => worker.Cancel());
var ex = await ThrowsAsync<TaskCanceledException>(() => worker.DoCancelled().ToTask());
ex.Should().NotBeNull();
}
internal static async Task<T> ThrowsAsync<T>(Func<Task> func) where T : class
{
Exception actual = null;
try
{
await func();
}
catch (Exception e)
{
actual = e;
}
actual.Should().NotBeNull();
actual.Should().BeAssignableTo<T>();
return actual as T;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment