Skip to content

Instantly share code, notes, and snippets.

@makomweb
Created October 9, 2014 08:34
Show Gist options
  • Save makomweb/a5c98a4e8c82049f5029 to your computer and use it in GitHub Desktop.
Save makomweb/a5c98a4e8c82049f5029 to your computer and use it in GitHub Desktop.
Issue, cancel and reissue async operation
public class WorkerCancellationTests
{
[Test]
public async Task When_running_async_then_result_should_be_true()
{
var worker = new Worker();
var result = await worker.DoAsync(100.Milliseconds());
result.Should().BeTrue();
}
[Test]
public async Task When_running_after_cancelled_then_should_be_true()
{
var worker = new Worker();
worker.Cancel();
var result = await worker.DoAsync(100.Milliseconds());
result.Should().BeTrue();
}
[Test]
public async Task When_cancelled_before_complete_then_should_throw()
{
var worker = new Worker();
var t = worker.DoAsync(500.Milliseconds());
worker.Cancel();
Assert.Throws<TaskCanceledException>(async () => await t);
var result = await worker.DoAsync(200.Milliseconds());
result.Should().BeTrue();
}
}
public static class MyReactiveExtensions
{
public static IObservable<T> ThrowWhenCancelled<T>(this IObservable<T> source,
CancellationToken cancellationToken)
{
return source.SelectMany(o => cancellationToken.IsCancellationRequested
? Observable.Throw<T>(new TaskCanceledException())
: Observable.Return(o));
}
}
public class Worker
{
private CancellationTokenSource _cts;
private CancellationToken CancellationToken
{
get
{
if (_cts == null)
_cts = new CancellationTokenSource();
return _cts.Token;
}
}
public async Task<bool> DoAsync(TimeSpan delay)
{
return await Do(delay).FirstAsync();
}
private IObservable<bool> Do(TimeSpan interval)
{
return Observable.Interval(interval).ThrowWhenCancelled(CancellationToken).Select(_ => true);
}
public void CancelAfter(TimeSpan delay)
{
if (_cts != null)
{
_cts.CancelAfter(delay);;
_cts.Dispose();
_cts = null;
}
}
public void Cancel()
{
if (_cts != null)
{
_cts.Cancel();
_cts.Dispose();
_cts = null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment