Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created September 19, 2018 07:55
Show Gist options
  • Save pchlupacek/3b0a9e7e8f4092583b872bb8dd4b18fa to your computer and use it in GitHub Desktop.
Save pchlupacek/3b0a9e7e8f4092583b872bb8dd4b18fa to your computer and use it in GitHub Desktop.
An attempt for CB implemented with Ref.
/*
* Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cats.effect.concurrent
import java.util.concurrent.TimeUnit
import cats.syntax.all._
import cats.effect.{Clock, Sync}
import scala.concurrent.duration._
trait CircuitBreaker[F[_]] {
def protect[A](fa: F[A]):F[A]
def state:F[CircuitBreaker.S]
}
object CircuitBreaker {
type TimeStamp = Long
sealed trait S
sealed trait Reason
case class Closed(failures: Int) extends S
case class Open(startedAt: TimeStamp, resetTimeout: FiniteDuration) extends S with Reason
case object HalfOpen extends S with Reason
private val ClosedZero = Closed(0)
case class RejectedExecution(reason: Reason) extends Throwable(s"Execution Rejected: $reason")
def of[F[_]](
maxFailures: Int
, resetTimeout: FiniteDuration
, exponentialBackoffFactor: Double = 1
, maxResetTimeout: Duration = Duration.Inf
)(implicit F: Sync[F], C: Clock[F]): F[CircuitBreaker[F]] = {
of(maxFailures, resetTimeout, exponentialBackoffFactor, maxResetTimeout, F.unit, F. unit, F.unit, F.unit)
}
def of[F[_]](
maxFailures: Int
, resetTimeout: FiniteDuration
, exponentialBackoffFactor: Double
, maxResetTimeout: Duration
, onRejected: F[Unit]
, onClosed: F[Unit]
, onHalfOpen: F[Unit]
, onOpen: F[Unit]
)(implicit F: Sync[F], C: Clock[F]): F[CircuitBreaker[F]] = {
Ref.of[F, S](ClosedZero).map { ref =>
new CircuitBreaker[F] {
def openOnFail[A](f: F[A]): F[A] = {
f.attempt.flatMap {
case Right(a) =>
ref.set(ClosedZero) as a
case Left(err) =>
C.monotonic(TimeUnit.MILLISECONDS).flatMap { now =>
ref.modifyF {
case Closed(failures) =>
val count = failures + 1
if (count >= maxFailures) (Open(now, resetTimeout), onOpen >> F.raiseError(err))
else (Closed(count), F.raiseError(err))
case open: Open => (open, F.raiseError(err))
case HalfOpen => (HalfOpen, F.raiseError(err))
}
}
}
}
def backoff(open:Open): Open = {
def next = (open.resetTimeout.toMillis * exponentialBackoffFactor).millis
open.copy(
resetTimeout = maxResetTimeout match {
case fin: FiniteDuration => next min fin
case _: Duration => next
}
)
}
def tryReset[A](open:Open,fa: F[A]): F[A] = {
C.monotonic(TimeUnit.MILLISECONDS).flatMap { now =>
if (open.startedAt + open.resetTimeout.toMillis >= now) onRejected >> F.raiseError(RejectedExecution(open))
else {
def resetOnSuccess: F[A] = {
fa.attempt.flatMap {
case Left(err) => ref.set(backoff(open)) >> F.raiseError(err)
case Right(a) => onClosed >> ref.set(ClosedZero) as a
}
}
ref.modifyF {
case closed: Closed => (closed, openOnFail(fa))
case open@Open(startedAt, resetTimeout) =>
if (startedAt == open.startedAt && open.resetTimeout == resetTimeout) (HalfOpen, onHalfOpen >> resetOnSuccess)
else (open, onRejected >> F.raiseError(RejectedExecution(open)))
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen)))
}
}
}
}
def protect[A](fa: F[A]): F[A] = {
ref.modifyF {
case closed: Closed => (closed, openOnFail(fa))
case open: Open => (open, tryReset(open, fa))
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen)))
}
}
def state: F[S] = ref.get
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment