Created
February 21, 2018 15:38
-
-
Save kzu/159e9a74f7283b7141c117db035a782a to your computer and use it in GitHub Desktop.
ObservableExtensions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ObservableExtensions | |
{ | |
static readonly Action<Exception> rethrow = e => ExceptionDispatchInfo.Capture(e).Throw(); | |
static readonly Action nop = () => { }; | |
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext) | |
=> Subscribe(source, onNext, rethrow, nop); | |
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError) | |
=> Subscribe(source, onNext, onError, nop); | |
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted) | |
=> Subscribe(source, onNext, rethrow, onCompleted); | |
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted) | |
=> source.Subscribe(new Observer<T>(onNext, onError, onCompleted)); | |
class Observer<T> : IObserver<T> | |
{ | |
Action<T> onNext; | |
Action<Exception> onError; | |
Action onCompleted; | |
public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted) | |
{ | |
this.onNext = onNext; | |
this.onError = onError; | |
this.onCompleted = onCompleted; | |
} | |
public void OnCompleted() => onCompleted(); | |
public void OnError(Exception error) => onError(error); | |
public void OnNext(T value) => onNext(value); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment