Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
AsyncObservable
// This works best for cold observables where you want to run Async methods after you subscribe.
// I may incorporate this into my QueryRunner....Not sure.
using System;
using System.Threading.Tasks;
namespace AsyncObservable
{
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T value);
Task OnErrorAsync(Exception error);
Task OnCompletedAsync();
}
public interface IAsyncObservable<out T>
{
Task<IDisposable> SubscribeAsync(IAsyncObserver<T> observer);
}
public static class AsyncObservableExtensions
{
public static Task<IDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNext, Func<Exception, Task> onError, Func<Task> onCompleted)
{
var observer = new AsyncObserver<T>(onNext, onError, onCompleted);
return source.SubscribeAsync(observer);
}
}
public class AsyncObserver<T> : IAsyncObserver<T>
{
public readonly Func<T, Task> OnNext;
public readonly Func<Exception,Task> OnError;
public readonly Func<Task> OnCompleted;
//TODO: overload constructor with non-async methods for onError and onCompleted.
public AsyncObserver(Func<T, Task> onNext, Func<Exception, Task> onError, Func<Task> onCompleted)
{
this.OnNext = onNext;
this.OnError = onError;
this.OnCompleted = onCompleted;
}
public async Task OnNextAsync(T value)
{
await OnNext(value);
}
public async Task OnErrorAsync(Exception error)
{
await OnError(error);
}
public async Task OnCompletedAsync()
{
await OnCompleted();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.