Skip to content

Instantly share code, notes, and snippets.

@ignasi35
Forked from gszeliga/custom_circuit_breaker.scala
Last active August 29, 2015 14:06
Show Gist options
  • Save ignasi35/3b9ea257545fb556b85d to your computer and use it in GitHub Desktop.
Save ignasi35/3b9ea257545fb556b85d to your computer and use it in GitHub Desktop.
While reviewing the code at http://covariantblabbering.blogspot.com.es/2014/09/use-circuit-breakers-goddammit.html?view=classic I found some things I wanted to rewrite and ended up with this code.
package com.covariantblabbering
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
class Configuration(val name: String, val timeout: Duration = 2000.millis, val failureThreshold: Int = 3)
class CircuitBreakerStillOpenException(msg: String) extends Exception(msg)
trait State {
def preInvoke(cb: CircuitBreaker) = {}
// renamed postInvoke to onSuccess
def onSuccess(cb: CircuitBreaker) = {}
def onError(cb: CircuitBreaker, source: Exception) = {}
def name:String
override def toString = name
}
object CircuitBreaker {
def apply(conf: Configuration) = new CircuitBreaker(conf)
}
class CircuitBreaker(private val conf: Configuration) {
// Right should not be an exception. Right should be T.
// Either[Exception,T] is usually replaceable with Try[T]
//def invoke[T](body: => T): Either[T, Exception] = {
def invoke[T](body: => T): Try[T] = {
try {
state.get.preInvoke(this)
val result = body
state.get.onSuccess(this)
Success(result)
} catch {
case e: CircuitBreakerStillOpenException | IllegalStateException=> Failure(e)
case e: Exception => {
state.get.onError(this, e)
Failure(e)
}
}
}
private object HalfOpen extends State {
def name = "Half-Open"
override def onSuccess(cb: CircuitBreaker) = cb.reset()
override def onError(cb: CircuitBreaker, source: Exception) = {
cb.incrementAndGetFailureCount
cb.breakTrip(this)
}
}
private object Open extends State {
def name = "Open"
override def preInvoke(cb: CircuitBreaker) = {
if (cb.timedOut) {
cb.attemptReset()
} else {
// TODO hmm I don't like this exception here.
throw new CircuitBreakerStillOpenException(s"Circuit breaker is still open. Elapsed time: ${cb.elapsed}");
}
}
}
private object Close extends State {
def name = "Close"
override def onSuccess(cb: CircuitBreaker) = cb.resetFailureCount()
override def onError(cb: CircuitBreaker, source: Exception) = {
// TODO: would like to change this too, put it into cb.reportError; if(cb.tooManyErrors)
val failures = cb.incrementAndGetFailureCount
val threshold = cb.failureThreshold
if (failures >= threshold) cb.breakTrip(this)
}
}
private val state = new AtomicReference[State]
private[custom] val failureCount = new AtomicInteger(0)
protected val _tripTime = new AtomicLong(0)
//Initialization
transition(null, Close)
private def transition(from: State, to: State) = {
def swap(from: State, to: State) = state.compareAndSet(from, to)
println(s"[${conf.name}] Transition [$from => $to] requested [fc: ${failureCount.get}, tt: ${_tripTime.get}]")
if (!swap(from, to)) throw new IllegalStateException(s"Illegal transition attempted from ${from} to ${to}")
}
private def reset() = {
println(s"[${conf.name}] Reset")
transition(HalfOpen, Close)
resetFailureCount()
}
private def elapsed = System.currentTimeMillis.millis - _tripTime.get.millis
private def timedOut = elapsed > conf.timeout
private def attemptReset() = {
println(s"[${conf.name}] Attempting reset");
transition(Open, HalfOpen)
}
private def failureThreshold = conf.failureThreshold
private def is = (s: State) => state.get == s
// These three are never used.
private def isOpen = is(Open)
private def isClose = is(Close)
private def isHalfOpen = is(HalfOpen)
private[custom] def incrementAndGetFailureCount = failureCount.incrementAndGet
// renamed from tripFromState to breakTrip
private[custom] def breakTrip(from: State) = {
println(s"[${conf.name}] Trip from state [$from]");
transition(from, Open)
_tripTime.set(System.currentTimeMillis)
}
private[custom] def resetFailureCount() = failureCount.set(0)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment