An attempt for CB implemented with Ref.
* Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package cats.effect.concurrent
import java.util.concurrent.TimeUnit
import cats.syntax.all._
import cats.effect.{Clock, Sync}
import scala.concurrent.duration._
trait CircuitBreaker[F[_]] {
def protect[A](fa: F[A]):F[A]
def state:F[CircuitBreaker.S]
object CircuitBreaker {
type TimeStamp = Long
sealed trait S
sealed trait Reason
case class Closed(failures: Int) extends S
case class Open(startedAt: TimeStamp, resetTimeout: FiniteDuration) extends S with Reason
case object HalfOpen extends S with Reason
private val ClosedZero = Closed(0)
case class RejectedExecution(reason: Reason) extends Throwable(s"Execution Rejected: $reason")
def of[F[_]](
maxFailures: Int
, resetTimeout: FiniteDuration
, exponentialBackoffFactor: Double = 1
, maxResetTimeout: Duration = Duration.Inf
)(implicit F: Sync[F], C: Clock[F]): F[CircuitBreaker[F]] = {
of(maxFailures, resetTimeout, exponentialBackoffFactor, maxResetTimeout, F.unit, F. unit, F.unit, F.unit)
def of[F[_]](
maxFailures: Int
, resetTimeout: FiniteDuration
, exponentialBackoffFactor: Double
, maxResetTimeout: Duration
, onRejected: F[Unit]
, onClosed: F[Unit]
, onHalfOpen: F[Unit]
, onOpen: F[Unit]
)(implicit F: Sync[F], C: Clock[F]): F[CircuitBreaker[F]] = {
Ref.of[F, S](ClosedZero).map { ref =>
new CircuitBreaker[F] {
def openOnFail[A](f: F[A]): F[A] = {
f.attempt.flatMap {
case Right(a) =>
ref.set(ClosedZero) as a
case Left(err) =>
C.monotonic(TimeUnit.MILLISECONDS).flatMap { now =>
ref.modifyF {
case Closed(failures) =>
val count = failures + 1
if (count >= maxFailures) (Open(now, resetTimeout), onOpen >> F.raiseError(err))
else (Closed(count), F.raiseError(err))
case open: Open => (open, F.raiseError(err))
case HalfOpen => (HalfOpen, F.raiseError(err))
def backoff(open:Open): Open = {
def next = (open.resetTimeout.toMillis * exponentialBackoffFactor).millis
resetTimeout = maxResetTimeout match {
case fin: FiniteDuration => next min fin
case _: Duration => next
def tryReset[A](open:Open,fa: F[A]): F[A] = {
C.monotonic(TimeUnit.MILLISECONDS).flatMap { now =>
if (open.startedAt + open.resetTimeout.toMillis >= now) onRejected >> F.raiseError(RejectedExecution(open))
else {
def resetOnSuccess: F[A] = {
fa.attempt.flatMap {
case Left(err) => ref.set(backoff(open)) >> F.raiseError(err)
case Right(a) => onClosed >> ref.set(ClosedZero) as a
ref.modifyF {
case closed: Closed => (closed, openOnFail(fa))
case open@Open(startedAt, resetTimeout) =>
if (startedAt == open.startedAt && open.resetTimeout == resetTimeout) (HalfOpen, onHalfOpen >> resetOnSuccess)
else (open, onRejected >> F.raiseError(RejectedExecution(open)))
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen)))
def protect[A](fa: F[A]): F[A] = {
ref.modifyF {
case closed: Closed => (closed, openOnFail(fa))
case open: Open => (open, tryReset(open, fa))
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen)))
def state: F[S] = ref.get
