Skip to content

Instantly share code, notes, and snippets.

Forked from drstevens/gist:1409829
Created July 22, 2014 11:55
Show Gist options
  • Save mika76/700afe91f6a8e5e394f1 to your computer and use it in GitHub Desktop.
Save mika76/700afe91f6a8e5e394f1 to your computer and use it in GitHub Desktop.
/* This is how I am globally handling fatal exceptions thrown from Rx subscribers.
* It is what I came up with in response to my stackoverflow question here
* This is far from ideal. From what I understand, exception handling has been improved greately in Rx for .NET 4.5
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NLog;
using NUnit.Framework;
namespace RxExceptionHandling
/// <summary>
/// I toyed with making this take a type param TException
/// but decided against it because there are many places this is used.
/// Each would have to duplicate this type parameter of the instance.
/// I instead used Exception and require that this be used for fatal events only.
/// Non fatal events must be caught in the action parameter
/// </summary>
public interface IExceptionCatcher
Action<T> Catch<T>(Action<T> action);
void HandleException(Exception e);
/// <summary>
/// This provides a way for exceptions to be handled from asynchronously executed code
/// </summary>
public class ExceptionCatcher : IExceptionCatcher, IDisposable
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private readonly object _lockObject = new object();
private readonly ManualResetEvent _resetEvent;
private AggregateException _exceptions;
private readonly Task _task;
private volatile bool _isExceptionCaught;
private readonly bool _continueOnError;
public ExceptionCatcher(Action<AggregateException> errorAction, bool continueOnError)
_continueOnError = continueOnError;
_resetEvent = new ManualResetEvent(false);
_task = Task.Factory.StartNew(() =>
AggregateException exceptions;
lock (_lockObject)
exceptions = _exceptions;
if (exceptions != null)
catch (Exception ex)
Logger.ErrorException("Error In Exception Action:" + ex.Message,
public Action<T> Catch<T>(Action<T> action)
return arg =>
if (!IsExecutionPrevented())
catch (Exception e)
public void HandleException(Exception e)
_isExceptionCaught = true;
lock (_lockObject)
var exceptions = new List<Exception> {e};
if (_exceptions != null)
_exceptions = new AggregateException(String.Format("Exceptions handled by {0}", GetType().FullName),
_resetEvent.Set(); //Signal task to wake up
public void Dispose()
//wake up error handling task in case it hasn't been yet
catch (Exception e)
//eat exception in Dispose
Logger.ErrorException("Exception handling task completed exceptionally: " + e.Message, e);
private bool IsExecutionPrevented()
return _isExceptionCaught && !_continueOnError;
public static class Observables
/// <summary>
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/>
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>.
/// Handle these exceptions using <paramref name="onError"/>
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <param name="source"></param>
/// <param name="onNext"></param>
/// <param name="onError"></param>
/// <param name="onCompleted"></param>
/// <returns></returns>
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source,
Action<TSource> onNext,
Action<Exception> onError,
Action onCompleted)
return source.Subscribe(item =>
catch (Exception e)
}, onError, onCompleted);
/// <summary>
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/>
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>.
/// Handle these exceptions using <paramref name="exceptionCatcher"/>
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <param name="source"></param>
/// <param name="onNext"></param>
/// <param name="exceptionCatcher">This is used to catch the exceptions</param>
/// <param name="onCompleted"></param>
/// <returns></returns>
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source,
Action<TSource> onNext,
IExceptionCatcher exceptionCatcher,
Action onCompleted)
return source.Subscribe(exceptionCatcher.Catch(onNext), exceptionCatcher.HandleException, onCompleted);
/// <summary>
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/>
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>.
/// Handle these exceptions using <paramref name="exceptionCatcher"/>
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <param name="source"></param>
/// <param name="onNext"></param>
/// <param name="exceptionCatcher">This is used to catch the exceptions</param>
/// <returns></returns>
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source,
Action<TSource> onNext,
IExceptionCatcher exceptionCatcher)
return source.Subscribe(exceptionCatcher.Catch(onNext), exceptionCatcher.HandleException);
public class ExceptionCatcherFixture
private static readonly IEnumerable<int> ONE_TO_TEN = Enumerable.Range(1, 10);
private static readonly Exception EXPECTED_EXCEPTION = new Exception("BobSagat!");
public void TestHandleException()
AggregateException actualException = null;
using (var handler = new ExceptionCatcher(e => actualException = e, false))
var startNew = Task.Factory.StartNew(() => handler.HandleException(EXPECTED_EXCEPTION));
startNew.Wait(TimeSpan.FromSeconds(5)); //need to wait for above task to complete
Assert.That(actualException, Is.Not.Null);
Assert.That(actualException.InnerExceptions, Contains.Item(EXPECTED_EXCEPTION));
public void TestExecuteWithNoException()
AggregateException actualException = null;
using (var handler = new ExceptionCatcher(e => actualException = e, false))
var action = handler.Catch<Exception>(Throw);
Assert.That(actualException != null);
Assert.That(actualException.InnerExceptions, Contains.Item(EXPECTED_EXCEPTION));
[Test(Description = "Verify that the subsequent action calls are prevented after exception handled")]
public void TestThatActionIsPrevented()
var lastActionValue = 0;
using (var handler = new ExceptionCatcher(DoNothing, false))
var tryAction = handler.Catch<int>(i =>
lastActionValue = i;
Assert.That(lastActionValue, Is.EqualTo(1));
[Test(Description = "Verify that the subsequent action calls are NOT prevented after exception handled")]
public void TestThatActionIsNotPrevented()
var lastActionValue = 0;
using (var handler = new ExceptionCatcher(DoNothing, true))
var tryAction = handler.Catch<int>(i =>
lastActionValue = i;
Assert.That(lastActionValue, Is.EqualTo(ONE_TO_TEN.Last()));
[Test(Description = "Verify that exceptions thrown from the error action are handled")]
public void TestThatExceptionsFromErrorActionHandled()
using (var handler = new ExceptionCatcher(_ => Throw(EXPECTED_EXCEPTION), true))
handler.HandleException(new Exception("Non Expected Exception"));
public static void DoNothing<T>(T _)
/// <summary>
/// convert a throw into an expression for use in lambdas
/// </summary>
/// <param name="e"></param>
public static void Throw(Exception e)
throw e;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment