Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/e63b9911c0a1a0f8a62f to your computer and use it in GitHub Desktop.
Save mattpodwysocki/e63b9911c0a1a0f8a62f to your computer and use it in GitHub Desktop.
/// <summary>
/// Returns a task that will receive the last value or the exception produced by the observable sequence.
/// </summary>
/// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
/// <param name="observable">Observable sequence to convert to a task.</param>
/// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param>
/// <param name="state">The state to use as the underlying task's AsyncState.</param>
/// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="observable"/> is null.</exception>
public static Task<IEnumerable<TResult>> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state)
{
if (observable == null)
throw new ArgumentNullException("observable");
var values = new List<TResult>();
var tcs = new TaskCompletionSource<TResult>(state);
var disposable = new SingleAssignmentDisposable();
var ctr = default(CancellationTokenRegistration);
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(() =>
{
disposable.Dispose();
tcs.TrySetCanceled();
});
}
var taskCompletionObserver = new AnonymousObserver<TResult>(
value =>
{
values.Add(value);
},
ex =>
{
tcs.TrySetException(ex);
ctr.Dispose(); // no null-check needed (struct)
disposable.Dispose();
},
() =>
{
tcs.TrySetResult(values);
ctr.Dispose(); // no null-check needed (struct)
disposable.Dispose();
}
);
//
// Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable
// will already have been disposed. Upon assignment, the disposable resource being set will be disposed on the
// spot, which may throw an exception. (Similar to TFS 487142)
//
try
{
//
// [OK] Use of unsafe Subscribe: we're catching the exception here to set the TaskCompletionSource.
//
// Notice we could use a safe subscription to route errors through OnError, but we still need the
// exception handling logic here for the reason explained above. We cannot afford to throw here
// and as a result never set the TaskCompletionSource, so we tunnel everything through here.
//
disposable.Disposable = observable.Subscribe/*Unsafe*/(taskCompletionObserver);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
return tcs.Task;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment