CircuitBreaker with tests
/** | |
The MIT License (MIT) | |
Copyright (c) 2015 Bartosz Sypytkowski | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
THE SOFTWARE. | |
*/ | |
public enum CircuitBreakerState | |
{ | |
Open, | |
Closed, | |
HalfOpen | |
} | |
/// <summary> | |
/// Exception thrown when circuit breaker is called while in either Open state | |
/// or in HalfOpen state for each call except the first one. | |
/// </summary> | |
public class CircuitBreakerOpenException : Exception | |
{ | |
public CircuitBreakerOpenException() | |
{ | |
} | |
public CircuitBreakerOpenException(string message) | |
: base(message) | |
{ | |
} | |
protected CircuitBreakerOpenException(SerializationInfo info, StreamingContext context) | |
: base(info, context) | |
{ | |
} | |
} | |
public class CircuitBreaker<TReq, TRep> | |
{ | |
// _state and _wasProbed are ints because of use Interlocked methods | |
private int _state = (int)CircuitBreakerState.Closed; | |
/// <summary> | |
/// Flag determining if first call was requested while in HalfOpen state. | |
/// </summary> | |
private int _wasProbed = 0; | |
/// <summary> | |
/// Service to be handled by the circuit breaker. | |
/// </summary> | |
private readonly Func<TReq, Task<TRep>> _service; | |
/// <summary> | |
/// Timeout for service call. If service won't respond until timeout occurs, | |
/// circuit breaker becomes <see cref="CircuitBreakerState.Closed"/>. | |
/// </summary> | |
public readonly TimeSpan ServiceTimeout; | |
/// <summary> | |
/// Timeout for closed state. After switching to <see cref="CircuitBreakerState.Closed"/>, | |
/// circuit breaker can remain in that state only for specified timeout. Then it becomes | |
/// <see cref="CircuitBreakerState.HalfOpen"/>. | |
/// </summary> | |
public readonly TimeSpan OpenedTimeout; | |
/// <summary> | |
/// Create new instance of the <see cref="CircuitBreaker"/> class working as a facade | |
/// to access provided <paramref name="service"/> call. | |
/// </summary> | |
/// <param name="service"></param> | |
/// <param name="serviceTimeout">Time limit given to <paramref name="service"/> to execute.</param> | |
/// <param name="openedTimeout"> | |
/// Defines an interval, after which circuit will automatically switch from Open to HalfOpen state. | |
/// </param> | |
public CircuitBreaker(Func<TReq, Task<TRep>> service, TimeSpan serviceTimeout, TimeSpan openedTimeout) | |
{ | |
_service = service; | |
ServiceTimeout = serviceTimeout; | |
OpenedTimeout = openedTimeout; | |
} | |
/// <summary> | |
/// Gets current state of the circuit breaker. | |
/// </summary> | |
public CircuitBreakerState State | |
{ | |
get { return (CircuitBreakerState)_state; } | |
} | |
/// <summary> | |
/// Performs an async call to underlying service. If it will fail or won't finish in specified | |
/// <see cref="ServiceTimeout"/>, either inner exception of <see cref="TimeoutException"/> will | |
/// be thrown and circuit will switch to <see cref="CircuitBreakerState.Open"/> state. | |
/// | |
/// Circuit breaker may stay in <see cref="CircuitBreakerState.Open"/> state for time specified | |
/// in <see cref="OpenedTimeout"/> field. While in that state any following calls will result in | |
/// task ending with <see cref="CircuitBreakerOpenException"/> being thrown. After | |
/// <see cref="OpenedTimeout"/> circuit will come into <see cref="CircuitBreakerState.HalfOpen"/> state. | |
/// | |
/// While in <see cref="CircuitBreakerState.HalfOpen"/> state first call will result in calling | |
/// an underlying service. Any subsequent calls will result in task with | |
/// <see cref="CircuitBreakerOpenException"/> being thrown inside. If service will fail after | |
/// calling, circuit switches back to <see cref="CircuitBreakerState.Open"/> state for specified | |
/// timeout. If call succeeds, it switches back to <see cref="CircuitBreakerState.Closed"/> state. | |
/// </summary> | |
public Task<TRep> Call(TReq request) | |
{ | |
switch (State) | |
{ | |
case CircuitBreakerState.Closed: return CallClosed(request); | |
case CircuitBreakerState.Open: return CallOpen(request); | |
case CircuitBreakerState.HalfOpen: return CallHalfOpen(request); | |
default: throw new NotSupportedException("Circuit breaker don't support state of " + State); | |
} | |
} | |
private Task<TRep> CallHalfOpen(TReq request) | |
{ | |
// pass first call to underlying service, and block all others | |
if (Interlocked.CompareExchange(ref _wasProbed, 1, 0) == 0) | |
{ | |
return CallClosed(request, true); | |
} | |
else | |
{ | |
return CallOpen(request); | |
} | |
} | |
private async Task<TRep> CallClosed(TReq request, bool resetState = false) | |
{ | |
var cancellation = new CancellationTokenSource(ServiceTimeout); | |
var serviceTask = _service(request); | |
var task = await Task.WhenAny(new Task[] { serviceTask, Task.Delay(ServiceTimeout, cancellation.Token) }); | |
if (task == serviceTask) | |
{ | |
if (task.IsFaulted) | |
{ | |
// if task failed, open circuit | |
Exception exc; | |
if (serviceTask.Exception != null) | |
{ | |
exc = serviceTask.Exception.InnerExceptions.First(); | |
BecomeOpen(); | |
throw exc; | |
} | |
} | |
if (resetState) BecomeClosed(); | |
return serviceTask.Result; | |
} | |
else | |
{ | |
// when task didn't finish in specified timeout, open circuit | |
BecomeOpen(); | |
throw new TimeoutException(string.Format("Circuit breaker timed out while waiting for underlying service to finish in {0} timeout", ServiceTimeout)); | |
} | |
} | |
private async Task<TRep> CallOpen(TReq request) | |
{ | |
throw new CircuitBreakerOpenException("Circuit breaker is open. Default time left for become half opened is " + OpenedTimeout); | |
} | |
private void BecomeClosed() | |
{ | |
Interlocked.Exchange(ref _state, (int)CircuitBreakerState.Closed); | |
_wasProbed = 0; | |
} | |
private void BecomeHalfOpen(CancellationTokenSource source) | |
{ | |
Interlocked.Exchange(ref _state, (int)CircuitBreakerState.HalfOpen); | |
source.Cancel(); | |
} | |
private void BecomeOpen() | |
{ | |
Interlocked.Exchange(ref _state, (int)CircuitBreakerState.Open); | |
// setup task for switching to HalfOpen state | |
var becomeHalfOpenCancellation = new CancellationTokenSource(); | |
Task.Delay(OpenedTimeout, becomeHalfOpenCancellation.Token).ContinueWith(t => BecomeHalfOpen(becomeHalfOpenCancellation), becomeHalfOpenCancellation.Token); | |
} | |
} | |
#region tests | |
internal static class TaskFailureExtensions | |
{ | |
internal static void ShouldFailWith<TException>(this Task t) where TException : Exception | |
{ | |
t.ContinueWith(t2 => | |
{ | |
t2.IsFaulted.ShouldBeTrue("Task should end with failure"); | |
t2.Exception.InnerExceptions.First().ShouldBeInstanceOf<TException>(); | |
}).Wait(); | |
} | |
} | |
public class CircuitBreakerTests | |
{ | |
internal class TestException : Exception { } | |
private TimeSpan defaultTimeout = TimeSpan.FromSeconds(1); | |
private TimeSpan openedTimeout = TimeSpan.FromSeconds(5); | |
[Fact] | |
public void CircuitBreaker_should_pass_on_success_of_underlying_service() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => x + 1, defaultTimeout, openedTimeout); | |
var reply = circuit.Call(1).Result; | |
reply.ShouldBe(2); | |
} | |
[Fact] | |
public void CircuitBreaker_should_fail_if_underlying_service_fails() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
throw new TestException(); | |
}, defaultTimeout, openedTimeout); | |
circuit.Call(1).ShouldFailWith<TestException>(); | |
} | |
[Fact] | |
public void CircuitBreaker_should_fail_if_underlying_service_is_unresponsive_over_timeout() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
await Task.Delay(defaultTimeout + TimeSpan.FromSeconds(1)); | |
return x + 1; | |
}, defaultTimeout, openedTimeout); | |
circuit.Call(1).ShouldFailWith<TimeoutException>(); | |
} | |
[Fact] | |
public void CircuitBreaker_should_switch_to_open_on_any_failure() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
throw new TestException(); | |
}, defaultTimeout, openedTimeout); | |
RunSyncWithoutThrowing(() => circuit.Call(1)); | |
circuit.State.ShouldBe(CircuitBreakerState.Open); | |
} | |
[Fact] | |
public void CircuitBreaker_should_throw_exception_on_call_under_Opened_state() | |
{ | |
var t = defaultTimeout + TimeSpan.FromSeconds(1); | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
if(x == 1) await Task.Delay(t); | |
return x + 1; | |
}, defaultTimeout, openedTimeout); | |
circuit.Call(1).ShouldFailWith<TimeoutException>(); | |
circuit.State.ShouldBe(CircuitBreakerState.Open); | |
circuit.Call(3).ShouldFailWith<CircuitBreakerOpenException>(); | |
} | |
[Fact] | |
public void CircuitBreaker_should_become_HalfOpen_after_specified_timeout() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
if (x == 1) throw new TestException(); | |
else return x + 1; | |
}, defaultTimeout, openedTimeout); | |
RunSyncWithoutThrowing(() => circuit.Call(1)); | |
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1)); | |
circuit.State.ShouldBe(CircuitBreakerState.HalfOpen); | |
} | |
[Fact] | |
public void CircuitBreaker_should_allow_first_call_under_HalfOpened_state() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
if (x == 1) throw new TestException(); | |
else return x + 1; | |
}, defaultTimeout, openedTimeout); | |
RunSyncWithoutThrowing(() => circuit.Call(1)); | |
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1)); | |
circuit.Call(2).Result.ShouldBe(3); | |
} | |
[Fact] | |
public void CircuitBreaker_should_throw_exception_for_following_calls_in_HalfOpened_state() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
if (x == 1) throw new TestException(); | |
await Task.Delay((int) defaultTimeout.TotalMilliseconds/2); | |
return x + 1; | |
}, defaultTimeout, openedTimeout); | |
circuit.Call(1).ShouldFailWith<TestException>(); // circuit fails and becomes Open | |
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1)); // wait for it to become HalfOpen | |
// first call should pass | |
var t = circuit.Call(2); | |
circuit.Call(3).ShouldFailWith<CircuitBreakerOpenException>(); // second should fail | |
t.Result.ShouldBe(3); | |
} | |
[Fact] | |
public void CircuitBreaker_should_switch_to_Closed_if_call_under_HalfOpened_succeeds() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
if (x == 1) throw new TestException(); | |
else return x + 1; | |
}, defaultTimeout, openedTimeout); | |
RunSyncWithoutThrowing(() => circuit.Call(1)); | |
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1)); | |
circuit.Call(2).Result.ShouldBe(3); | |
circuit.State.ShouldBe(CircuitBreakerState.Closed); | |
} | |
[Fact] | |
public void CircuitBreaker_should_switch_back_to_Opened_if_call_under_HalfOpened_fails() | |
{ | |
var circuit = new CircuitBreaker<int, int>(async x => | |
{ | |
if (x == 1) throw new TestException(); | |
else return x + 1; | |
}, defaultTimeout, openedTimeout); | |
RunSyncWithoutThrowing(() => circuit.Call(1)); | |
Thread.Sleep(openedTimeout + TimeSpan.FromSeconds(1)); | |
RunSyncWithoutThrowing(() => circuit.Call(1)); | |
circuit.State.ShouldBe(CircuitBreakerState.Open); | |
} | |
private static void RunSyncWithoutThrowing(Func<Task> action) | |
{ | |
Task.Run(async () => | |
{ | |
try { await action(); } catch (Exception) {} | |
}).Wait(); | |
} | |
} | |
#endregion |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment