Skip to content

Instantly share code, notes, and snippets.

@drstevens
Created November 30, 2011 17:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save drstevens/1409829 to your computer and use it in GitHub Desktop.
Save drstevens/1409829 to your computer and use it in GitHub Desktop.
Globally handle fatal exceptions in Rx.Net subscribers
/* This is how I am globally handling fatal exceptions thrown from Rx subscribers.
* It is what I came up with in response to my stackoverflow question here
* http://stackoverflow.com/questions/7210051/catching-exceptions-which-may-be-thrown-from-a-subscription-onnext-action
* This is far from ideal. From what I understand, exception handling has been improved greately in Rx for .NET 4.5
*/
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NLog;
using NUnit.Framework;
namespace RxExceptionHandling
{
/// <summary>
/// I toyed with making this take a type param TException
/// but decided against it because there are many places this is used.
/// Each would have to duplicate this type parameter of the instance.
/// I instead used Exception and require that this be used for fatal events only.
/// Non fatal events must be caught in the action parameter
/// </summary>
public interface IExceptionCatcher
{
Action<T> Catch<T>(Action<T> action);
void HandleException(Exception e);
}
/// <summary>
/// This provides a way for exceptions to be handled from asynchronously executed code
/// </summary>
public class ExceptionCatcher : IExceptionCatcher, IDisposable
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private readonly object _lockObject = new object();
private readonly ManualResetEvent _resetEvent;
private AggregateException _exceptions;
private readonly Task _task;
private volatile bool _isExceptionCaught;
private readonly bool _continueOnError;
public ExceptionCatcher(Action<AggregateException> errorAction, bool continueOnError)
{
_continueOnError = continueOnError;
_resetEvent = new ManualResetEvent(false);
_task = Task.Factory.StartNew(() =>
{
try
{
_resetEvent.WaitOne();
AggregateException exceptions;
lock (_lockObject)
{
exceptions = _exceptions;
}
if (exceptions != null)
{
errorAction(exceptions);
}
}
catch (Exception ex)
{
Logger.ErrorException("Error In Exception Action:" + ex.Message,
ex);
throw;
}
finally
{
Logger.Trace("Exiting...");
}
});
}
public Action<T> Catch<T>(Action<T> action)
{
return arg =>
{
try
{
if (!IsExecutionPrevented())
action(arg);
}
catch (Exception e)
{
HandleException(e);
}
};
}
public void HandleException(Exception e)
{
_isExceptionCaught = true;
lock (_lockObject)
{
var exceptions = new List<Exception> {e};
if (_exceptions != null)
{
exceptions.AddRange(_exceptions.InnerExceptions);
}
_exceptions = new AggregateException(String.Format("Exceptions handled by {0}", GetType().FullName),
exceptions);
_resetEvent.Set(); //Signal task to wake up
}
}
public void Dispose()
{
try
{
//wake up error handling task in case it hasn't been yet
_resetEvent.Set();
_task.Wait(TimeSpan.FromSeconds(10));
}
catch (Exception e)
{
//eat exception in Dispose
Logger.ErrorException("Exception handling task completed exceptionally: " + e.Message, e);
}
}
private bool IsExecutionPrevented()
{
return _isExceptionCaught && !_continueOnError;
}
}
public static class Observables
{
/// <summary>
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/>
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>.
/// Handle these exceptions using <paramref name="onError"/>
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <param name="source"></param>
/// <param name="onNext"></param>
/// <param name="onError"></param>
/// <param name="onCompleted"></param>
/// <returns></returns>
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source,
Action<TSource> onNext,
Action<Exception> onError,
Action onCompleted)
{
return source.Subscribe(item =>
{
try
{
onNext(item);
}
catch (Exception e)
{
onError(e);
}
}, onError, onCompleted);
}
/// <summary>
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/>
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>.
/// Handle these exceptions using <paramref name="exceptionCatcher"/>
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <param name="source"></param>
/// <param name="onNext"></param>
/// <param name="exceptionCatcher">This is used to catch the exceptions</param>
/// <param name="onCompleted"></param>
/// <returns></returns>
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source,
Action<TSource> onNext,
IExceptionCatcher exceptionCatcher,
Action onCompleted)
{
return source.Subscribe(exceptionCatcher.Catch(onNext), exceptionCatcher.HandleException, onCompleted);
}
/// <summary>
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/>
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>.
/// Handle these exceptions using <paramref name="exceptionCatcher"/>
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <param name="source"></param>
/// <param name="onNext"></param>
/// <param name="exceptionCatcher">This is used to catch the exceptions</param>
/// <returns></returns>
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source,
Action<TSource> onNext,
IExceptionCatcher exceptionCatcher)
{
return source.Subscribe(exceptionCatcher.Catch(onNext), exceptionCatcher.HandleException);
}
}
[TestFixture]
public class ExceptionCatcherFixture
{
private static readonly IEnumerable<int> ONE_TO_TEN = Enumerable.Range(1, 10);
private static readonly Exception EXPECTED_EXCEPTION = new Exception("BobSagat!");
[Test]
public void TestHandleException()
{
//Arrange
AggregateException actualException = null;
//Act
using (var handler = new ExceptionCatcher(e => actualException = e, false))
{
var startNew = Task.Factory.StartNew(() => handler.HandleException(EXPECTED_EXCEPTION));
startNew.Wait(TimeSpan.FromSeconds(5)); //need to wait for above task to complete
}
//Assert
Assert.That(actualException, Is.Not.Null);
Assert.That(actualException.InnerExceptions, Contains.Item(EXPECTED_EXCEPTION));
}
[Test]
public void TestExecuteWithNoException()
{
//Arrange
AggregateException actualException = null;
//Act
using (var handler = new ExceptionCatcher(e => actualException = e, false))
{
var action = handler.Catch<Exception>(Throw);
action(EXPECTED_EXCEPTION);
}
//Assert
Assert.That(actualException != null);
Assert.That(actualException.InnerExceptions, Contains.Item(EXPECTED_EXCEPTION));
}
[Test(Description = "Verify that the subsequent action calls are prevented after exception handled")]
public void TestThatActionIsPrevented()
{
//Arrange
var lastActionValue = 0;
//Act
using (var handler = new ExceptionCatcher(DoNothing, false))
{
var tryAction = handler.Catch<int>(i =>
{
lastActionValue = i;
throw EXPECTED_EXCEPTION;
});
ONE_TO_TEN.ForEach(tryAction);
}
//Assert
Assert.That(lastActionValue, Is.EqualTo(1));
}
[Test(Description = "Verify that the subsequent action calls are NOT prevented after exception handled")]
public void TestThatActionIsNotPrevented()
{
//Arrange
var lastActionValue = 0;
//Act
using (var handler = new ExceptionCatcher(DoNothing, true))
{
var tryAction = handler.Catch<int>(i =>
{
lastActionValue = i;
throw EXPECTED_EXCEPTION;
});
ONE_TO_TEN.ForEach(tryAction);
}
//Assert
Assert.That(lastActionValue, Is.EqualTo(ONE_TO_TEN.Last()));
}
[Test(Description = "Verify that exceptions thrown from the error action are handled")]
public void TestThatExceptionsFromErrorActionHandled()
{
//Arrange
//Act
using (var handler = new ExceptionCatcher(_ => Throw(EXPECTED_EXCEPTION), true))
{
handler.HandleException(new Exception("Non Expected Exception"));
}
//Assert
Assert.True(true);
Thread.Sleep(TimeSpan.FromMilliseconds(100));
}
public static void DoNothing<T>(T _)
{
}
/// <summary>
/// convert a throw into an expression for use in lambdas
/// </summary>
/// <param name="e"></param>
public static void Throw(Exception e)
{
throw e;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment