Last active
August 29, 2015 14:17
-
-
Save mattpodwysocki/e63b9911c0a1a0f8a62f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Returns a task that will receive the last value or the exception produced by the observable sequence. | |
/// </summary> | |
/// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam> | |
/// <param name="observable">Observable sequence to convert to a task.</param> | |
/// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param> | |
/// <param name="state">The state to use as the underlying task's AsyncState.</param> | |
/// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns> | |
/// <exception cref="ArgumentNullException"><paramref name="observable"/> is null.</exception> | |
public static Task<IEnumerable<TResult>> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state) | |
{ | |
if (observable == null) | |
throw new ArgumentNullException("observable"); | |
var values = new List<TResult>(); | |
var tcs = new TaskCompletionSource<TResult>(state); | |
var disposable = new SingleAssignmentDisposable(); | |
var ctr = default(CancellationTokenRegistration); | |
if (cancellationToken.CanBeCanceled) | |
{ | |
ctr = cancellationToken.Register(() => | |
{ | |
disposable.Dispose(); | |
tcs.TrySetCanceled(); | |
}); | |
} | |
var taskCompletionObserver = new AnonymousObserver<TResult>( | |
value => | |
{ | |
values.Add(value); | |
}, | |
ex => | |
{ | |
tcs.TrySetException(ex); | |
ctr.Dispose(); // no null-check needed (struct) | |
disposable.Dispose(); | |
}, | |
() => | |
{ | |
tcs.TrySetResult(values); | |
ctr.Dispose(); // no null-check needed (struct) | |
disposable.Dispose(); | |
} | |
); | |
// | |
// Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable | |
// will already have been disposed. Upon assignment, the disposable resource being set will be disposed on the | |
// spot, which may throw an exception. (Similar to TFS 487142) | |
// | |
try | |
{ | |
// | |
// [OK] Use of unsafe Subscribe: we're catching the exception here to set the TaskCompletionSource. | |
// | |
// Notice we could use a safe subscription to route errors through OnError, but we still need the | |
// exception handling logic here for the reason explained above. We cannot afford to throw here | |
// and as a result never set the TaskCompletionSource, so we tunnel everything through here. | |
// | |
disposable.Disposable = observable.Subscribe/*Unsafe*/(taskCompletionObserver); | |
} | |
catch (Exception ex) | |
{ | |
tcs.TrySetException(ex); | |
} | |
return tcs.Task; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment