Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Generic CircuitBreaker implementation
public enum CircuitBreakerState
{
Open,
Closed,
HalfOpen
}
/// <summary>
/// Exception thrown when circuit breaker is called while in either Open state
/// or in HalfOpen state for each call except the first one.
/// </summary>
public class CircuitBreakerOpenException : Exception
{
public CircuitBreakerOpenException()
{
}
public CircuitBreakerOpenException(string message)
: base(message)
{
}
protected CircuitBreakerOpenException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
public class CircuitBreaker<TReq, TRep> : IDisposable
{
// _state and _wasProbed are ints because of use Interlocked methods
private int _state = (int)CircuitBreakerState.Closed;
/// <summary>
/// Flag determining if first call was requested while in HalfOpen state.
/// </summary>
private int _wasProbed = 0;
/// <summary>
/// Cancellation token source for stoping delayed task of transitioning from Open to HalfOpen state.
/// </summary>
private CancellationTokenSource _becomeHalfOpenCancellation = null;
/// <summary>
/// Service to be handled by the circuit breaker.
/// </summary>
private readonly Func<TReq, Task<TRep>> _service;
/// <summary>
/// Timeout for service call. If service won't respond until timeout occurs,
/// circuit breaker becomes <see cref="CircuitBreakerState.Closed"/>.
/// </summary>
public readonly TimeSpan ServiceTimeout;
/// <summary>
/// Timeout for closed state. After switching to <see cref="CircuitBreakerState.Closed"/>,
/// circuit breaker can remain in that state only for specified timeout. Then it becomes
/// <see cref="CircuitBreakerState.HalfOpen"/>.
/// </summary>
public readonly TimeSpan OpenedTimeout;
/// <summary>
/// Create new instance of the <see cref="CircuitBreaker"/> class working as a facade
/// to access provided <paramref name="service"/> call.
/// </summary>
/// <param name="service"></param>
/// <param name="serviceTimeout">Time limit given to <paramref name="service"/> to execute.</param>
/// <param name="openedTimeout">
/// Defines an interval, after which circuit will automatically switch from Open to HalfOpen state.
/// </param>
public CircuitBreaker(Func<TReq, Task<TRep>> service, TimeSpan serviceTimeout, TimeSpan openedTimeout)
{
_service = service;
ServiceTimeout = serviceTimeout;
OpenedTimeout = openedTimeout;
}
/// <summary>
/// Gets current state of the circuit breaker.
/// </summary>
public CircuitBreakerState State
{
get { return (CircuitBreakerState)_state; }
}
/// <summary>
/// Performs an async call to underlying service. If it will fail or won't finish in specified
/// <see cref="ServiceTimeout"/>, either inner exception of <see cref="TimeoutException"/> will
/// be thrown and circuit will switch to <see cref="CircuitBreakerState.Open"/> state.
///
/// Circuit breaker may stay in <see cref="CircuitBreakerState.Open"/> state for time specified
/// in <see cref="OpenedTimeout"/> field. While in that state any following calls will result in
/// task ending with <see cref="CircuitBreakerOpenException"/> being thrown. After
/// <see cref="OpenedTimeout"/> circuit will come into <see cref="CircuitBreakerState.HalfOpen"/> state.
///
/// While in <see cref="CircuitBreakerState.HalfOpen"/> state first call will result in calling
/// an underlying service. Any subsequent calls will result in task with
/// <see cref="CircuitBreakerOpenException"/> being thrown inside. If service will fail after
/// calling, circuit switches back to <see cref="CircuitBreakerState.Open"/> state for specified
/// timeout. If call succeeds, it switches back to <see cref="CircuitBreakerState.Closed"/> state.
/// </summary>
public Task<TRep> Call(TReq request)
{
switch (State)
{
case CircuitBreakerState.Closed: return CallClosed(request);
case CircuitBreakerState.Open: return CallOpen(request);
case CircuitBreakerState.HalfOpen: return CallHalfOpen(request);
default: throw new NotSupportedException("Circuit breaker don't support state of " + State);
}
}
public void Dispose()
{
StopHalfOpenTick();
}
private Task<TRep> CallHalfOpen(TReq request)
{
// pass first call to underlying service, and block all others
if (Interlocked.CompareExchange(ref _wasProbed, 1, 0) == 0)
{
return CallClosed(request, true);
}
else
{
return CallOpen(request);
}
}
private async Task<TRep> CallClosed(TReq request, bool resetState = false)
{
var cancellation = new CancellationTokenSource(ServiceTimeout);
var serviceTask = _service(request);
var task = await Task.WhenAny(new Task[] { serviceTask, Task.Delay(ServiceTimeout, cancellation.Token) });
if (task == serviceTask)
{
if (task.IsFaulted)
{
// if task failed, open circuit
Exception exc;
if (serviceTask.Exception != null)
{
exc = serviceTask.Exception.InnerExceptions.First();
BecomeOpen();
throw exc;
}
}
if (resetState) BecomeClosed();
return serviceTask.Result;
}
else
{
// when task didn't finish in specified timeout, open circuit
BecomeOpen();
throw new TimeoutException(string.Format("Circuit breaker timed out while waiting for underlying service to finish in {0} timeout", ServiceTimeout));
}
}
private async Task<TRep> CallOpen(TReq request)
{
throw new CircuitBreakerOpenException("Circuit breaker is open. Default time left for become half opened is " + OpenedTimeout);
}
private void BecomeClosed()
{
Interlocked.Exchange(ref _state, (int)CircuitBreakerState.Closed);
_wasProbed = 0;
}
private void BecomeHalfOpen()
{
Interlocked.Exchange(ref _state, (int)CircuitBreakerState.HalfOpen);
StopHalfOpenTick();
}
private void BecomeOpen()
{
Interlocked.Exchange(ref _state, (int)CircuitBreakerState.Open);
// setup task for switching to HalfOpen state
_becomeHalfOpenCancellation = new CancellationTokenSource();
Task.Delay(OpenedTimeout).ContinueWith(t => BecomeHalfOpen(), _becomeHalfOpenCancellation.Token);
}
private void StopHalfOpenTick()
{
if (_becomeHalfOpenCancellation != null)
{
_becomeHalfOpenCancellation.Cancel();
_becomeHalfOpenCancellation = null;
}
}
}
#region test suite
internal static class TaskFailureExtensions
{
internal static void ShouldFailWith<TException>(this Task t) where TException : Exception
{
t.ContinueWith(t2 =>
{
t2.IsFaulted.ShouldBeTrue("Task should end with failure");
t2.Exception.InnerExceptions.First().ShouldBeInstanceOf<TException>();
}).Wait();
}
}
public class CircuitBreakerTests
{
internal class TestException : Exception { }
private TimeSpan defaultTimeout = TimeSpan.FromSeconds(1);
private TimeSpan openedTimeout = TimeSpan.FromSeconds(5);
[Fact]
public void CircuitBreaker_should_pass_on_success_of_underlying_service()
{
var circuit = new CircuitBreaker<int, int>(async x => x + 1, defaultTimeout, openedTimeout);
var reply = circuit.Call(1).Result;
reply.ShouldBe(2);
}
[Fact]
public void CircuitBreaker_should_fail_if_underlying_service_fails()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
throw new TestException();
}, defaultTimeout, openedTimeout);
circuit.Call(1).ShouldFailWith<TestException>();
}
[Fact]
public void CircuitBreaker_should_fail_if_underlying_service_is_unresponsive_over_timeout()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
await Task.Delay(defaultTimeout + TimeSpan.FromSeconds(1));
return x + 1;
}, defaultTimeout, openedTimeout);
circuit.Call(1).ShouldFailWith<TimeoutException>();
}
[Fact]
public void CircuitBreaker_should_switch_to_open_on_any_failure()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
throw new TestException();
}, defaultTimeout, openedTimeout);
RunSyncWithoutThrowing(() => circuit.Call(1));
circuit.State.ShouldBe(CircuitBreakerState.Open);
}
[Fact]
public void CircuitBreaker_should_throw_exception_on_call_under_Opened_state()
{
var t = defaultTimeout + TimeSpan.FromSeconds(1);
var circuit = new CircuitBreaker<int, int>(async x =>
{
if(x == 1) await Task.Delay(t);
return x + 1;
}, defaultTimeout, openedTimeout);
circuit.Call(1).ShouldFailWith<TimeoutException>();
circuit.State.ShouldBe(CircuitBreakerState.Open);
circuit.Call(3).ShouldFailWith<CircuitBreakerOpenException>();
}
[Fact]
public void CircuitBreaker_should_become_HalfOpen_after_specified_timeout()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
if (x == 1) throw new TestException();
else return x + 1;
}, defaultTimeout, openedTimeout);
RunSyncWithoutThrowing(() => circuit.Call(1));
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1));
circuit.State.ShouldBe(CircuitBreakerState.HalfOpen);
}
[Fact]
public void CircuitBreaker_should_allow_first_call_under_HalfOpened_state()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
if (x == 1) throw new TestException();
else return x + 1;
}, defaultTimeout, openedTimeout);
RunSyncWithoutThrowing(() => circuit.Call(1));
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1));
circuit.Call(2).Result.ShouldBe(3);
}
[Fact]
public void CircuitBreaker_should_throw_exception_for_following_calls_in_HalfOpened_state()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
if (x == 1) throw new TestException();
await Task.Delay((int) defaultTimeout.TotalMilliseconds/2);
return x + 1;
}, defaultTimeout, openedTimeout);
circuit.Call(1).ShouldFailWith<TestException>(); // circuit fails and becomes Open
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1)); // wait for it to become HalfOpen
// first call should pass
var t = circuit.Call(2);
circuit.Call(3).ShouldFailWith<CircuitBreakerOpenException>(); // second should fail
t.Result.ShouldBe(3);
}
[Fact]
public void CircuitBreaker_should_switch_to_Closed_if_call_under_HalfOpened_succeeds()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
if (x == 1) throw new TestException();
else return x + 1;
}, defaultTimeout, openedTimeout);
RunSyncWithoutThrowing(() => circuit.Call(1));
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1));
circuit.Call(2).Result.ShouldBe(3);
circuit.State.ShouldBe(CircuitBreakerState.Closed);
}
[Fact]
public void CircuitBreaker_should_switch_back_to_Opened_if_call_under_HalfOpened_fails()
{
var circuit = new CircuitBreaker<int, int>(async x =>
{
if (x == 1) throw new TestException();
else return x + 1;
}, defaultTimeout, openedTimeout);
RunSyncWithoutThrowing(() => circuit.Call(1));
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1));
RunSyncWithoutThrowing(() => circuit.Call(1));
circuit.State.ShouldBe(CircuitBreakerState.Open);
}
private static void RunSyncWithoutThrowing(Func<Task> action)
{
Task.Run(async () =>
{
try { await action(); } catch (Exception) {}
}).Wait();
}
}
#endregion
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment