Skip to content

Instantly share code, notes, and snippets.

@kzu
Created February 21, 2018 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kzu/159e9a74f7283b7141c117db035a782a to your computer and use it in GitHub Desktop.
Save kzu/159e9a74f7283b7141c117db035a782a to your computer and use it in GitHub Desktop.
ObservableExtensions
public static class ObservableExtensions
{
static readonly Action<Exception> rethrow = e => ExceptionDispatchInfo.Capture(e).Throw();
static readonly Action nop = () => { };
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
=> Subscribe(source, onNext, rethrow, nop);
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)
=> Subscribe(source, onNext, onError, nop);
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
=> Subscribe(source, onNext, rethrow, onCompleted);
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
=> source.Subscribe(new Observer<T>(onNext, onError, onCompleted));
class Observer<T> : IObserver<T>
{
Action<T> onNext;
Action<Exception> onError;
Action onCompleted;
public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnCompleted() => onCompleted();
public void OnError(Exception error) => onError(error);
public void OnNext(T value) => onNext(value);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment