Skip to content

Instantly share code, notes, and snippets.

@canton7
Last active July 31, 2020 17:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save canton7/767ea4fde94984323b31 to your computer and use it in GitHub Desktop.
Save canton7/767ea4fde94984323b31 to your computer and use it in GitHub Desktop.
public static class ReactiveExtensionsExtensions
{
public static async Task SubscribeAsync<T>(this IObservable<T> observable, Action<T> onNext, CancellationToken? cancellationToken = null)
{
CancellationToken token = cancellationToken ?? CancellationToken.None;
var cts = new TaskCompletionSource<bool>();
using(token.Register(() => cts.SetCanceled()))
{
observable.Subscribe(onNext, ex => cts.SetException(ex), () => cts.SetResult(true), token);
await cts.Task;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment